In this section, we will extend our Chicago crime use case to design and code the different layers of Lambda Architecture in Spark.
Let's extend our Chicago crime dataset and assume that the Chicago crime data is delivered in near real-time. Next, our custom consumers will consume the data and will need to find out the number of crimes for each crime category. Though, in most cases, users will require the grouping of data only for the chunk of data received in near real-time, but, in a few use cases, aggregations need to be done on historical data.
Seems like a Lambda use case, doesn't it?
Let's first analyze the complete architecture with all of its components, and then we will describe, code, and execute each and every component of Lambda Architecture.
In this section, we will discuss the high-level architecture of our Chicago crime use case that is developed using the principles of Lambda Architecture.
We will leverage Spark Streaming and Spark batches along with Cassandra to realize our Lambda Architecture, but users are free to use any technology as per the technology matrix defined in the Technology matrix for Lambda Architecture section.
The preceding illustration shows a high-level architecture and its various components, which are used to develop the Lambda Architecture for the Chicago crime dataset.
Let's move forward and discuss the role of each of the components:
lambdaarchitecture
and table masterdata
.lambdaarchitecture
and table realtimedata
. This table will contain the aggregated data for each batch received by our socket streams.masterdata
and perform the grouping operation to find out the count of each distinct crime type.lambdaarchitecture
and table batchviewdata
.realtimedata
, assuming that the records in this table are already updated in masterdata
.batchviewdata
and realtimedata
. It performs the grouping on the data for each distinct crime type and prints it on the console.Now, let's move to the next section where we will code each of these components.
Perform the following steps to configure Apache Cassandra and integrate our Spark cluster with Cassandra:
CASSANDRA_HOME
, which will point to the directory where we have extracted the downloaded archive file:export CASSANDRA_HOME = <location of the extracted Archive>
$CASSANDRA_HOME/bin/cassandra
The preceding command will bring up our Cassandra database, which is now ready to serve the user request, but, before this, let's create the keyspace and tables to store our data.
$CASSANDRA_HOME/bin/cqlsh
CQLSH is the command-line utility that provides SQL-like syntax to perform CRUD operations on Cassandra databases.
CREATE KEYSPACE lambdaarchitecture WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE lambdaarchitecture.masterdata ( id varchar, casenumber varchar, date varchar, block varchar, iucr varchar, primarytype varchar, description varchar, PRIMARY KEY (id) ); CREATE TABLE lambdaarchitecture.realtimedata ( id bigint, primaryType varchar, count int, PRIMARY KEY (id) ); CREATE TABLE lambdaarchitecture.batchviewdata ( primarytype varchar, count int, PRIMARY KEY (primarytype) );
Next, we will configure and integrate our Spark cluster to leverage the Spark-Cassandra APIs for performing CRUD operations:
$SPARK_HOME
environment variable to the folder we extracted the binaries to:export SPARK_HOME = <location of the Spark extracted Archive>
$CASSANDRA_HOME/lib
directory:$SPARK_HOME/conf/spark-default.conf
file and define the following environment variables:spark.driver.extraClassPath=$CASSANDRA_HOME/lib/apache-cassandra-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-clientutil-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-thrift-2.1.7.jar:$CASSANDRA_HOME/lib/cassandra-driver-internal-only-2.5.1.zip:$CASSANDRA_HOME/lib/thrift-server-0.3.7.jar:$CASSANDRA_HOME/lib/guava-16.0.jar:$CASSANDRA_HOME/lib/joda-convert-1.2.jar:$CASSANDRA_HOME/lib/joda-time-2.3.jar:$CASSANDRA_HOME/lib/jsr166e-1.1.0.jar:$CASSANDRA_HOME/lib/spark-cassandra-connector_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/spark-cassandra-connector-java_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/cassandra-driver-core-2.1.9.jar spark.executor.extraClassPath=$CASSANDRA_HOME/lib/apache-cassandra-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-clientutil-2.1.7.jar:$CASSANDRA_HOME/lib/apache-cassandra-thrift-2.1.7.jar:$CASSANDRA_HOME/lib/cassandra-driver-internal-only-2.5.1.zip:$CASSANDRA_HOME/lib/thrift-server-0.3.7.jar:$CASSANDRA_HOME/lib/guava-16.0.jar:$CASSANDRA_HOME/lib/joda-convert-1.2.jar:$CASSANDRA_HOME/lib/joda-time-2.3.jar:$CASSANDRA_HOME/lib/jsr166e-1.1.0.jar:$CASSANDRA_HOME/lib/spark-cassandra-connector_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/spark-cassandra-connector-java_2.10-1.4.0.jar;$CASSANDRA_HOME/lib/cassandra-driver-core-2.1.9.jar spark.cassandra.connection.host=localhost
Replace $CASSANDRA_HOME
with the actual location on the filesystem and save the file.
Refer to https://github.com/datastax/spark-cassandra-connector for more information on the Spark-Cassandra connector.
We are done with the configuration and integration of Spark with Cassandra. Now, we will move to the next section where we will code the producer and other layers/jobs.
Perform the following steps to code a custom producer in Java:
Spark-Examples
project in Eclipse, and add a new package and class called chapter.ten.producer.CrimeProducer.java
.CrimeProducer.java
and add the following piece of code in it:package chapter.ten.producer; import java.io.BufferedReader; import java.io.FileReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.Random; public class CrimeProducer { public static void main(String[] args) { if (args == null || args.length < 1) { System.out.println("Usage - java chapter.ten.CrimeProducer <port#>"); System.exit(0); } System.out.println("Defining new Socket on " + args[0]); try (ServerSocket soc = new ServerSocket(Integer.parseInt(args[0]))) { System.out.println("Waiting for Incoming Connection on - " + args[0]); Socket clientSocket = soc.accept(); System.out.println("Connection Received"); OutputStream outputStream = clientSocket.getOutputStream(); // Path of the file from where we need to read crime //records. String filePath = "/home/ec2-user/softwares/crime-data/Crimes_-Aug-2015.csv"; PrintWriter out = new PrintWriter(outputStream, true); BufferedReader brReader = new BufferedReader(new FileReader(filePath)); // Defining Random number to read different number of //records each time. Random number = new Random(); // Keep Reading the data in a Infinite loop and send it //over to the Socket. while (true) { System.out.println("Reading Crime Records"); StringBuilder dataBuilder = new StringBuilder(); // Getting new Random Integer between 0 and 20 int recordsToRead = number.nextInt(20); System.out.println("Records to Read = " + recordsToRead); for (int i = 0; i < recordsToRead; i++) { String dataLine = brReader.readLine() + " "; dataBuilder.append(dataLine); } System.out.println("Data received and now writing it to Socket"); out.println(dataBuilder); out.flush(); // Sleep for 20 Seconds before reading again Thread.sleep(20000); } } catch (Exception e) { e.printStackTrace(); }} }
Perform the following steps to code real-time layer in Scala:
Spark-Examples
project in Eclipse, and add a new Scala package and object to it named chapter.ten.dataConsumption.LADataConsumptionJob.scala
.LADataConsumptionJob
will be our Spark Streaming job that will listen to the socket and consume data as it arrives. So, apart from creating StreamingContext
and leveraging socketTextStream
for consuming data, we will also define two functions: one for persisting/appending data in the master table (masterdata
) in Cassandra, and another for grouping the same data to find out the count of distinct crime types and finally persisting it into the real-time view table (realtimedata
) in Cassandra. So, as the first step, define persistInMaster(…)
for persisting the raw data into the master table:def persistInMaster(streamCtx:StreamingContext,lines:DStream[String]){ //Define Keyspace val keyspaceName ="lambdaarchitecture" //Define Table for persisting Master records val csMasterTableName="masterdata" lines.foreachRDD { x => //Splitting, flattening and finally filtering to exclude any Empty Rows val rawCrimeRDD = x.map(_.split(" ")).flatMap { x => x }.filter { x => x.length()>2 } println("Master Data Received = "+rawCrimeRDD.collect().length) //Splitting again for each Distinct value in the Row and creating Scala SEQ val splitCrimeRDD = rawCrimeRDD.map { x => x.split(",") }.map(c => createSeq(c)) println("Now creating Sequence Persisting") //Finally Flattening the results and persisting in the table. val crimeRDD = splitCrimeRDD.flatMap(f=>f) crimeRDD.saveToCassandra(keyspaceName, csMasterTableName, SomeColumns("id","casenumber","date","block","iucr", "primarytype", "description")) } }
masterdata
) in Cassandra. Now, define generateRealTimeView(…)
, which will aggregate the raw crime data (RDD of strings) and persist it into the real-time table (realtimedata
) in Cassandra:def generateRealTimeView(streamCtx:StreamingContext,lines:DStream[String]){ //Define Keyspace val keyspaceName ="lambdaarchitecture" //Define table to persisting process records val csRealTimeTableName="realtimedata" lines.foreachRDD { x => //Splitting, flattening and finally filtering to exclude any Empty Rows val rawCrimeRDD = x.map(_.split(" ")).flatMap { x => x }.filter { x => x.length()>2 } println("Real Time Data Received = "+rawCrimeRDD.collect().length) //Splitting again for each Distinct value in the Row val splitCrimeRDD = rawCrimeRDD.map { x => x.split(",") } //Converting RDD of String to Objects [Crime] val crimeRDD = splitCrimeRDD.map(c => Crime(c(0), c(1),c(2),c(3),c(4),c(5),c(6))) val sqlCtx = getInstance(streamCtx.sparkContext) //Using Dynamic Mapping for creating DF import sqlCtx.implicits._ //Converting RDD to DataFrame val dataFrame = crimeRDD.toDF() //Perform few operations on DataFrames println("Number of Rows in Data Frame = "+dataFrame.count()) // Perform Group By Operation using Raw SQL val rtViewFrame = dataFrame.groupBy("primarytype").count() //Adding a new column to DF for PK val finalRTViewFrame = rtViewFrame.withColumn("id", new Column("count")+System.nanoTime) //Printing the records which will be persisted in Cassandra println("showing records which will be persisted into the realtime view") finalRTViewFrame.show(10) //Leveraging the DF.save for persisting/ Appending the complete DataFrame. finalRTViewFrame.write.format("org.apache.spark.sql.cassandra"). options(Map( "table" -> "realtimedata", "keyspace" -> "lambdaarchitecture" )). mode(SaveMode.Append).save() } } //Defining Singleton SQLContext variable @transient private var instance: SQLContext = null //Lazy initialization of SQL Context def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance }
Perform the following steps to code the batch layer in Scala:
Spark-Examples
project in Eclipse, and add a new Scala package and object named chapter.ten.batch.LAGenerateBatchViewJob.scala
.LAGenerateBatchViewJob
will be our Spark batch job, which will simply get the data from the master table, group the records based on the crime type, and persist the grouped records into the master view table (batchviewdata
) in Cassandra. Apart from defining SparkContext
and SparkConf
in the main
method, we will define and invoke one more function called generateMasterView(SparkContext)
:def generateMasterView(sparkCtx:SparkContext){ //Define Keyspace val keyspaceName ="lambdaarchitecture" //Define Master (Append Only) Table val csMasterTableName="masterdata" //Define Real Time Table val csRealTimeTableName="realtimedata" //Define Table for persisting Batch View Data val csBatchViewTableName = "batchviewdata" // Get Instance of Spark SQL val sqlCtx = getInstance(sparkCtx) //Load the data from "masterdata" Table val df = sqlCtx.read.format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "masterdata", "keyspace" -> "lambdaarchitecture" )).load() //Applying standard DataFrame function for //performing grouping of Crime Data by Primary Type. val batchView = df.groupBy("primarytype").count() //Persisting the grouped data into the Batch View Table batchView.write.format("org.apache.spark.sql.cassandra"). options(Map( "table" -> "batchviewdata", "keyspace" -> "lambdaarchitecture" )).mode(SaveMode.Overwrite).save() //Delete the Data from Real-Time Table as now it is //already part of grouping done in previous steps val csConnector = CassandraConnector.apply(sparkCtx.getConf) val csSession = csConnector.openSession() csSession.execute("TRUNCATE "+keyspaceName+"."+csRealTimeTableName) csSession.close() println("Data Persisted in the Batch View Table - lambdaarchitecture.batchviewdata") } //Defining Singleton SQLContext variable @transient private var instance: SQLContext = null //Lazy initialization of SQL Context def getInstance(sparkContext: SparkContext): SQLContext = synchronized { if (instance == null) { instance = new SQLContext(sparkContext) } instance }
Perform the following steps to code the serving layer in Scala:
Spark-Examples
project in Eclipse, and add a new Scala package and object called chapter.ten.serving.LAServingJob.scala
.LAServingJob
again is a batch job, which gets the data from both the Cassandra tables, batchviewdata
and realtimedata
, and groups them to present final records to the consumer. Apart from defining SparkContext
and SparkConf
in the main
method, we will define and invoke one more function called generatefinalView(SparkContext)
:def generatefinalView(sparkCtx:SparkContext){ //Define Keyspace val keyspaceName ="lambdaarchitecture" //Define Master (Append Only) Table val csRealTimeTableName="realtimedata" //Define Table for persisting Batch View Data val csBatchViewTableName = "batchviewdata" // Get Instance of Spark SQL val sqlCtx = getInstance(sparkCtx) //Load the data from "batchviewdata" Table val batchDf = sqlCtx.read.format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "batchviewdata", "keyspace" -> "lambdaarchitecture" )).load() //Load the data from "realtimedata" Table val realtimeDF = sqlCtx.read.format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "realtimedata", "keyspace" -> "lambdaarchitecture" )).load() //Select only Primary Type and Count from Real Time View val seRealtimeDF = realtimeDF.select("primarytype", "count") //Merge/ Union both ("batchviewdata" and "realtimedata") and //produce/ print the final Output on Console println("Final View after merging Batch and Real-Time Views") val finalView = batchDf.unionAll(seRealtimeDF).groupBy("primarytype").sum("count") finalView.show(20) }
We are done with all the layers of our Lambda Architecture! Now, let's move to the next section, where we will execute all the layers one by one and see the results.
Let's perform the following steps and bring up all our services:
Spark-Examples
project as spark-examples.jar
and save it in a directory. Refer to this directory as <LA-HOME>
.<LA-HOME>
:java -classpath spark-examples.jar chapter.ten.producer.CrimeProducer 9047
<LA-HOME>
, and execute the following command to bring up your real-time job:$SPARK_HOME/bin/spark-submit --class chapter.ten.dataConsumption.LADataConsumptionJob --master spark://ip-10-149-132-99:7077 spark-examples.jar 9047
masterdata
and realtimedata
—will be populated. Now, in order to browse the data, we can open a new Linux console and execute the following commands:$CASSANDRA_HOME/bin/cqlsh Select * from lambdaarchitecture.masterdata;
The output is shown in the following screenshot:
The preceding screenshot shows the output of the master table in Cassandra. We can also execute Select * from lambdaarchitecture.realtimedata;
on the same CQLSH window to see the data in the real-time table.
LAGenerateBatchViewJob
. Our batch job is a manual job, which can be scheduled to be executed at regular intervals, but, for the sake of simplicity, we can execute the following command on a new Linux console:$SPARK_HOME/bin/spark-submit --class chapter.ten.batch.LAGenerateBatchViewJob --master spark://ip-10-149-132-99:7077 spark-examples.jar
As soon as we execute the preceding command, our batchviewdata
table in Cassandra will be populated with the latest data. We can execute Select * from lambdaarchitecture.batchviewdata;
on our CQLSH window to browse the data:
The preceding illustration shows the aggregated crime data in the batchviewdata
table.
LAServingJob
, which is again executed on user request. This job will merge the batch and real-time views and present a single output. We can open a new Linux console and execute the following command:$SPARK_HOME/bin/spark-submit --class chapter.ten.serving.LAServingJob --master spark://ip-10-149-132-99:7077 spark-examples.jar
As soon as we execute the preceding command, it will produce and print the merged views of data residing in batchviewdata
and realtimedata
, which will be similar to the following screenshot:
We are done with all the layers of our Lambda Architecture!
We have just touched the tip of the iceberg where we have discussed the overall concept and objective of Lambda Architecture. Architectures developed on the principles of Lambda can be much more complex and thought provoking, which is interesting to discuss and solve, but that will not be in the scope or context of this book or chapter.
In this section, we have realized the Lambda Architecture using Spark and Cassandra. We have developed all the layers of Lambda Architecture using Spark Streaming and Spark batches, and also leveraged and integrated Apache Cassandra with Spark for persisting the data.
Do write back or e-mail me at <[email protected]>
with any queries or use cases regarding Lambda Architecture, and I will work with you to solve or implement your queries or use cases.