In this section, we will leverage the various functions exposed by RDD APIs and analyze our Chicago crime dataset. We will start with simple operations and move on to the complex transformations. First, let's create/define some base classes and then we will develop our transformation logic.
Perform the following steps to write the basic building blocks:
Spark-Examples
projects and create a new Scala class by the name of chapter.seven.ScalaCrimeUtil.scala
. This class will contain some utility functions that will be utilized by our main transformation job.ScalaCrimeUtil.scala
and add the following piece of code:package chapter.seven class ScalaCrimeUtil extends Serializable{ /** * Create a Map of the data which is extracted by applying Regular expression. */ def createDataMap(data:String): Map[String, String] = { //Replacing Empty columns with the blank Spaces, //so that split function always produce same size Array val crimeData = data.replaceAll(",,,", ", , , ") //Splitting the Single Crime record val array = crimeData.split(",") //Creating the Map of values val dataMap = Map[String, String]( ("ID" -> array(0)), ("Case Number" -> array(1)), ("Date" -> array(2)), ("Block" -> array(3)), ("IUCR" -> array(4)), ("Primary Type" -> array(5)), ("Description" -> array(6)), ("Location Description" -> array(7)), ("Arrest" -> array(8)), ("Domestic" -> array(9)), ("Beat" -> array(10)), ("District" -> array(11)), ("Ward" -> array(12)), ("Community Area" -> array(13)), ("FBI Code" -> array(14)), ("X Coordinate" -> array(15)), ("Y Coordinate" -> array(16)), ("Year" -> array(17)), ("Updated On" -> array(18)), ("Latitude" -> array(19)), ("Longitude" -> array(20).concat(array(21))) ) //Finally returning it to the invoking program return dataMap } }
The preceding code defines a utility function createDataMap(data:String)
that converts a single line of crime dataset into key/value pairs.
chapter.seven.ScalaTransformCrimeData.scala
and add the following code: package chapter.seven import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.rdd._ import org.apache.hadoop._ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce._ /** * Transformation Job for showcasing different transformations on the Crime Dataset. * @author Sumit Gupta * */ object ScalaTransformCrimeData { def main(args: Array[String]) { println("Creating Spark Configuration") //Create an Object of Spark Configuration val conf = new SparkConf() //Set the logical and user defined Name of this Application conf.setAppName("Scala - Transforming Crime Dataset") println("Creating Spark Context") //Create a Spark Context and provide previously created //Object of SparkConf as an reference. val ctx = new SparkContext(conf) //Define the location of the file containing the Crime Data val file = "file:///home/ec2-user/softwares/crime-data/Crimes_-Aug-2015.csv"; println("Loading the Dataset and will further process it") //Loading the Text file from the local file system or HDFS //and converting it into RDD. //SparkContext.textFile(..) - It uses the Hadoop's //TextInputFormat and file is broken by New line Character. //Refer to http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapred/TextInputFormat.html //The Second Argument is the Partitions which specify the parallelism. //It should be equal or more then number of Cores in the cluster. val logData = ctx.textFile(file, 2) //Now Perform the Transformations on the Data Loaded by Spark executeTransformations(ctx, logData) //Stop the Context for Graceful Shutdown ctx.stop() } /** * Main Function for invoking all kind of Transformation on Crime Data */ def executeTransformations(ctx: SparkContext, crimeData: RDD[String]) { } }
The preceding code loads the crime dataset and then defines/invokes a new method, executeTransformations
. This method is the central point for invoking or executing any transformations applied to our Chicago crime dataset. It accepts the dataset loaded by Spark in the form of RDD[String]
and further
analysis is performed to find the answer to the questions asked by the businesses, customers, market analysts, and others. We will also define some questions/scenarios and provide the answers/solution to the same by leveraging various Spark transformations and actions.
Scenario 1
How do we find the total count of the crime registered in the month of August 2015, grouped by the type of crime?
Solution 1
The solution is simple. We first need to convert the data into a key/value pair, filter the data based on the column, Primary Type, and then finally aggregate the data based on the column Primary Type. The result would be the RDD of a key/value pair where the key will be the type of crime and the value as the count.
Let's define a new function by the name of findCrimeCountByPrimaryType
just before the closing braces of the Scala object ScalaTransformCrimeData
and add the following piece of code:
/** * Provide the Count of All Crimes by its "Primary Type" */ def findCrimeCountByPrimaryType(ctx: SparkContext, crimeData: RDD[String]) { //Utility class for Transforming Crime Data val analyzer = new ScalaCrimeUtil() //Flattening the Crime Data by converting into Map of Key/ Value Pair val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x)) //Performing 3 Steps: //1. Filtering the Data and fetching data only for "Primary Type" //2. Creating a Map of Key Value Pair //3. Applying reduce function for getting count of each key val results = crimeMap.filter(f => f._1.equals("Primary Type")). map(x => (x._2, 1)).reduceByKey(_ + _) //Printing the unsorted results on the Console println("Printing the Count by the Type of Crime") results.collect().foreach(f => println(f._1 + "=" + f._2)) }
Next, invoke the previous function from the executeTransformations(…)
method and we are done. For executing the job, perform the following steps:
.jar
file, name it spark-examples.jar
, and save this .jar
file in the root of $SPARK_HOME
.$SPARK_HOME
, and execute the following command:$SPARK_HOME/bin/spark-submit --class chapter.seven.ScalaTransformCrimeData --master spark://ip-10-166-191-242:7077 spark-examples.jar
In the preceding command, ensure that the value given to parameter --master
is the same as it is shown on your Spark UI.
As soon as you execute the following command, Spark will execute the provided transformation functions and finally print the results on the driver console, which would be similar to the following screenshot:
Easy isn't it? Let's move to next scenario and perform some real analysis!
Scenario 2
How to uncover the top five crimes performed in Chicago for August 2015?
Solution 2
The solution is again simple. We need to perform all the steps what we did in Solution 1, then sort the values (count) of the resulting map in descending order, then take the top five, and finally print on the console. Let's define a new function by the name of findTop5Crime(…)
just before the closing braces of the Scala object ScalaTransformCrimeData
and add the following piece of code:
/** * Find the Top 5 Crimes by its "Primary Type" */ def findTop5Crime(ctx: SparkContext, crimeData: RDD[String]) { //Utility class for Transforming Crime Data val analyzer = new ScalaCrimeUtil() //Flattening the Crime Data by converting into Map of Key/ Value Pair val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x)) //Performing 3 Steps: //1. Filtering the Data and fetching data only for "Primary Type" //2. Creating a Map of Key Value Pair //3. Applying reduce function for getting count of each key val results = crimeMap.filter(f => f._1.equals("Primary Type")). map(x => (x._2, 1)).reduceByKey(_ + _) //Perform Sort based on the Count val sortedResults = results.sortBy(f => f._2, false) //Collect the Sorted results and print the Top 5 Crime println("Printing Sorted Top 5 Crime based on the Primary Type of Crime") sortedResults.collect().take(5).foreach(f => println(f._1 + "=" + f._2)) }
Now invoke the preceding function from the executeTransformations(…)
method and we are done. For executing the job, perform the same steps as we did earlier in Solution 1. As soon as we execute the job, Spark will execute the provided transformation functions and finally print the results on the driver console, which would be similar to the following screenshot:
Scenario 3
How to find the total count of the crime in the month of August grouped and sorted by the type of crime (primary type)?
Solution 3
The solution of this scenario is pretty obvious. We need to perform custom sorting on the type of crime. In order to achieve that, we need to perform all the steps what we did in Solution 1 and then perform the custom sorting on the crime type. Let's define a new function by the name of findCrimeCountByPrimaryType (…)
just before the closing braces of the Scala object ScalaTransformCrimeData
and add the following piece of code:
/** * Provide Custom Sorting on the type of Crime "Primary Type" */ def performSortOnCrimeType(ctx: SparkContext, crimeData: RDD[String]) { //Utility class for Transforming Crime Data val analyzer = new ScalaCrimeUtil() //Flattening the Crime Data by converting into Map of Key/ Value Pair val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x)) //Performing 3 Steps: //1. Filtering the Data and fetching data only for "Primary Type" //2. Creating a Map of Key Value Pair //3. Applying reduce function for getting count of each key val results = crimeMap.filter(f => f._1.equals("Primary Type")). map(x => (x._2, 1)).reduceByKey(_ + _) //Perform Custom Sort based on the Type of Crime (Primary Type) import scala.reflect.classTag val customSortedResults = results.sortBy(f => createCrimeObj(f._1, f._2), true) (CrimeOrdering, classTag[Crime]) //Collect the Sorted results and print the Top 5 Crime println("Now Printing Sorted Results using Custom Sorting..............") customSortedResults.collect().foreach(f => println(f._1 + "=" + f._2)) } /** * Case Class which defines the Crime Object */ case class Crime(crimeType: String, count: Int) /** * Utility Function for creating Object of Class Crime */ val createCrimeObj = (crimeType: String, count: Int) => { Crime(crimeType, count) } /** * Custom Ordering function which defines the Sorting behavior. */ implicit val CrimeOrdering = new Ordering[Crime] { def compare(a: Crime, b: Crime): Int = a.crimeType.compareTo(b.crimeType) }
Now, invoke the preceding function from the executeTransformations(…)
method and we are done. For executing the job, perform the same steps we used earlier in Solution 1. As soon as we execute the job, Spark will execute the provided transformation functions and finally print the results on the driver console, which will be similar to the following screenshot:
Scenario 4
How to persist the filtered crime data map either as the text file or the object file on a local filesystem?
Solution 4
We need to perform all the steps we did in Solution 1 and then invoke saveAsTextFile(path:String)
and saveAsObjectFile(path:String)
. Let's define a new function by the name of persistCrimeData(…)
just before the closing braces of the Scala object ScalaTransformCrimeData
and add the following piece of code:
/** * Persist the filtered Crime Data Map into various formats (Text/ Object/ HDFS) */ def persistCrimeData(ctx: SparkContext, crimeData: RDD[String]) { //Utility class for Transforming Crime Data val analyzer = new ScalaCrimeUtil() //Flattening the Crime Data by converting into Map of Key/ Value Pair val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x)) //Performing 3 Steps: //1. Filtering the Data and fetching data only for "Primary Type" //2. Creating a Map of Key Value Pair //3. Applying reduce function for getting count of each key val results = crimeMap.filter(f => f._1.equals("Primary Type")). map(x => (x._2, 1)).reduceByKey(_ + _) println("Now Persisting as Text File") //Ensure that the Path on local file system exists till "output" folder. results.saveAsTextFile("file:///home/ec2-user/softwares/crime-data/output/Crime-TextFile"+System.currentTimeMillis()) println("Now Persisting as Object File") //Ensure that the Path on local file system exists till "output" folder. results.saveAsObjectFile("file:///home/ec2-user/softwares/crime-data/output/Crime-ObjFile"+System.currentTimeMillis()) }
Now, invoke the preceding function from the executeTransformations(…)
method and we are done. For executing the job, perform the same steps as we did earlier in Solution 1.
Scenario 5
How to persist data in Hadoop HDFS?
Solution 5
In order to persist data on Hadoop HDFS, first we need to set up Hadoop. So let's perform the following steps to set up Hadoop and HDFS:
export HADOOP_PREFIX=<path of your directory where we extracted Hadoop>
$HADOOP_PREFIX/bin/hdfs namenode –format $HADOOP_PREFIX/sbin/start-dfs.sh
namenode
and make the filesystem ready for use. With the second command, we are starting the minimum required Hadoop services that will include namenode
and secondary namenode
.$HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark/crime-data $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark/ crime-data/oldApi $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark/s crime-data/newApi
If everything goes right and there are no exceptions then open your browser, browse to http://localhost:50070/explorer.html#/
and you will be able to see the empty directories created by the preceding commands, as shown in the following screenshot:
The preceding screenshot shows the HDFS file system explorer where we can browse, view, and download any of the files created by the users using HDFS APIs.
As we are done with the Hadoop installation, let's modify our existing function persistCrimeData(…)
and add the following piece of code just before the closing braces:
//Creating an Object of Hadoop Config with default Values val hConf = new JobConf(new org.apache.hadoop.conf.Configuration()) //Defining the TextOutputFormat using old APIs available with =<0.20 val oldClassOutput = classOf[org.apache.hadoop.mapred.TextOutputFormat[Text,Text]] //Invoking Output operation to save data in HDFS using old APIs //This method accepts the following Parameters: //1.Path of the File on HDFS //2.Key - Class which can work with the Key //3.Value - Class which can work with the Key //4.OutputFormat - Class needed for writing the Output in a specific Format //5.HadoopConfig - Object of Hadoop Config println("Now Persisting as Hadoop File using in Hadoop's Old APIs") results.saveAsHadoopFile("hdfs://localhost:9000/spark/crime-data/oldApi/Crime-"+System.currentTimeMillis(), classOf[Text], classOf[Text], oldClassOutput ,hConf ) //Defining the TextOutputFormat using new APIs available with >0.20 val newTextOutputFormat = classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[Text, Text]] //Invoking Output operation to save data in HDFS using new APIs //This method accepts same set of parameters as "saveAsHadoopFile" println("Now Persisting as Hadoop File using in Hadoop's New APIs") results.saveAsNewAPIHadoopFile("hdfs://localhost:9000/spark/crime-data/newApi/Crime-"+System.currentTimeMillis(), classOf[Text], classOf[Text], newTextOutputFormat ,hConf )
For executing the job, perform the same steps as we did earlier in Solution 1 and as soon as we execute our job, we will see the data files being generated and persisted in Hadoop HDFS:
The preceding screenshot shows the files being generated by our Spark job in HDFS directories.
Scenario 6
How to find the total count of the crime registered in the month of August grouped by the IUCR code names?
Solution 6
IUCR codes or Illinois Uniform Crime Reporting (IUCR) codes are the standard crime codes given by the Chicago Police Department. They need to be downloaded from http://data.cityofchicago.org/Public-Safety/Chicago-Police-Department-Illinois-Uniform-Crime-R/c7ck-438e and stored by the name of IUCRCodes.txt
in the same directory where we have stored our Chicago crime data. Now the solution to problem would be a two-step process. First, we need to group the crimes based on IUCR codes and second we need to merge the IUCR codes and add/replace them with their real names from IUCRCodes.txt
file.
Perform the following steps to implement the solution:
ScalaCrimeUtil.scala
by the name of createIUCRDataMap(…)
. This function will convert the IUCR data files into a map of key/value pairs. Next, add the following code in createIUCRDataMap(…)
:/** * Create a Map of the data which is extracted by applying Regular expression. */ def createIUCRDataMap(data:String): Map[String, String] = { //Replacing Empty columns with the blank Spaces, //so that split function always produce same size Array val icurData = data.replaceAll(",,,", ", , , ") //Splitting the Single Crime record val array = icurData.split(",") //Creating the Map of values "IUCR Codes = Values" val iucrDataMap = Map[String, String]( (array(0) -> array(1)) ) //Finally returning it to the invoking program return iucrDataMap }
findCrimeCountByIUCRCodes(…)
just before the closing braces of the Scala object—ScalaTransformCrimeData
and add the following piece of code:/** * Find the Crime Count by IUCR Codes and also display the IUCR Code Names */ def findCrimeCountByIUCRCodes(ctx: SparkContext, crimeData: RDD[String]) { //Utility class for Transforming Crime Data val analyzer = new ScalaCrimeUtil() //Flattening the Crime Data by converting into Map of Key/ Value Pair val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x)) //Performing 3 Steps: //1. Filtering the Data and fetching data only for "Primary Type" //2. Creating a Map of Key Value Pair //3. Applying reduce function for getting count of each key val results = crimeMap.filter(f => f._1.equals("IUCR")). map(x => (x._2, 1)).reduceByKey(_ + _) //Loading IUCR Codes File in Spark Memory val iucrFile = "file:///home/ec2-user/softwares/crime-data/IUCRCodes.txt"; println("Loading the Dataset and will further process it") val iucrCodes = ctx.textFile(iucrFile, 2) //Convert IUCR Codes into a map of Values val iucrCodeMap = iucrCodes.flatMap(x => analyzer.createIUCRDataMap(x)) //Apply Left Outer Join to get all results from Crime RDD //and matching records from IUCR RDD val finalResults = results.leftOuterJoin(iucrCodeMap) //Finally Print the results finalResults.collect().foreach(f => println(""+f._1 + "=" + f._2)) }
And we are done. The next step is to execute the preceding piece of code. Perform the same set of steps as we did to execute Solution 1. Once we execute the job, the outcome of the job will be similar to the following screenshot:
The preceding screenshot shows the output of the Spark job which merges two different datasets and then prints the result on the driver console.
Some of the values of IUCR code names are printed as None
because there are no matching codes in the IUCR code came file (IUCRCodes.txt
). As we are using leftOuterJoin
, all data from the left-hand side dataset (crime records) is considered, while only matching records are taken from right-hand side dataset (IUCR codes) and all unmatched records are marked as None
.
In this section, we discussed examples for performing transformations and actions to solve real-life problem statements. Let's move forward to the next section where we will discuss persistence in Spark.