Programming Spark transformations and actions

In this section, we will leverage the various functions exposed by RDD APIs and analyze our Chicago crime dataset. We will start with simple operations and move on to the complex transformations. First, let's create/define some base classes and then we will develop our transformation logic.

Perform the following steps to write the basic building blocks:

  1. We will extend our Spark-Examples projects and create a new Scala class by the name of chapter.seven.ScalaCrimeUtil.scala. This class will contain some utility functions that will be utilized by our main transformation job.
  2. Open and edit ScalaCrimeUtil.scala and add the following piece of code:
    package chapter.seven
    
    class ScalaCrimeUtil extends Serializable{
      
       /**
       * Create a Map of the data which is extracted by applying Regular expression.
       */
      def createDataMap(data:String): Map[String, String] = {
        
        //Replacing Empty columns with the blank Spaces, 
        //so that split function always produce same size Array
        val crimeData = data.replaceAll(",,,", ", , , ")
        //Splitting the Single Crime record
        val array = crimeData.split(",")
        //Creating the Map of values
        val dataMap = Map[String, String](
        ("ID" -> array(0)),
        ("Case Number" -> array(1)),
        ("Date" -> array(2)),
        ("Block" -> array(3)),
        ("IUCR" -> array(4)),
        ("Primary Type" -> array(5)),
        ("Description" -> array(6)),
        ("Location Description" -> array(7)),
        ("Arrest" -> array(8)),
        ("Domestic" -> array(9)),
        ("Beat" -> array(10)),
        ("District" -> array(11)),
        ("Ward" -> array(12)),
        ("Community Area" -> array(13)),
        ("FBI Code" -> array(14)),
        ("X Coordinate" -> array(15)),
        ("Y Coordinate" -> array(16)),
        ("Year" -> array(17)),
        ("Updated On" -> array(18)),
        ("Latitude" -> array(19)),
        ("Longitude" -> array(20).concat(array(21)))
        
      )
      //Finally returning it to the invoking program 
      return dataMap
      }
    
    }

    The preceding code defines a utility function createDataMap(data:String) that converts a single line of crime dataset into key/value pairs.

  3. Next, we will create our transformation job. Create a new Scala object by the name of chapter.seven.ScalaTransformCrimeData.scala and add the following code:
    package chapter.seven
    
    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.rdd._
    import org.apache.hadoop._
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.io._
    import org.apache.hadoop.mapreduce._
    
    /**
     * Transformation Job for showcasing different transformations on the Crime Dataset.
     * @author Sumit Gupta
     *
     */
    object ScalaTransformCrimeData {
    
      def main(args: Array[String]) {
        println("Creating Spark Configuration")
        //Create an Object of Spark Configuration
        val conf = new SparkConf()
        //Set the logical and user defined Name of this Application
        conf.setAppName("Scala - Transforming Crime Dataset")
        println("Creating Spark Context")
        //Create a Spark Context and provide previously created
        //Object of SparkConf as an reference.
        val ctx = new SparkContext(conf)
        //Define the location of the file containing the Crime Data
        val file = "file:///home/ec2-user/softwares/crime-data/Crimes_-Aug-2015.csv";
        println("Loading the Dataset and will further process it")
        //Loading the Text file from the local file system or HDFS
        //and converting it into RDD.
        //SparkContext.textFile(..) - It uses the Hadoop's
        //TextInputFormat and file is broken by New line Character.
        //Refer to http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapred/TextInputFormat.html
        //The Second Argument is the Partitions which specify the parallelism.
        //It should be equal or more then number of Cores in the cluster.
        val logData = ctx.textFile(file, 2)
    
        //Now Perform the Transformations on the Data Loaded by Spark
        executeTransformations(ctx, logData)
       //Stop the Context for Graceful Shutdown
       ctx.stop()
    
      }
    
      /**
       * Main Function for invoking all kind of Transformation on Crime Data
       */
      def executeTransformations(ctx: SparkContext, crimeData: RDD[String]) {  }
    }

The preceding code loads the crime dataset and then defines/invokes a new method, executeTransformations. This method is the central point for invoking or executing any transformations applied to our Chicago crime dataset. It accepts the dataset loaded by Spark in the form of RDD[String] and further analysis is performed to find the answer to the questions asked by the businesses, customers, market analysts, and others. We will also define some questions/scenarios and provide the answers/solution to the same by leveraging various Spark transformations and actions.

Scenario 1

How do we find the total count of the crime registered in the month of August 2015, grouped by the type of crime?

Solution 1

The solution is simple. We first need to convert the data into a key/value pair, filter the data based on the column, Primary Type, and then finally aggregate the data based on the column Primary Type. The result would be the RDD of a key/value pair where the key will be the type of crime and the value as the count.

Let's define a new function by the name of findCrimeCountByPrimaryType just before the closing braces of the Scala object ScalaTransformCrimeData and add the following piece of code:

/**   * Provide the Count of All Crimes by its "Primary Type"
   */

  def findCrimeCountByPrimaryType(ctx: SparkContext, crimeData: RDD[String]) {

    //Utility class for Transforming Crime Data
    val analyzer = new ScalaCrimeUtil()

    //Flattening the Crime Data by converting into Map of Key/ Value Pair 
    val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x))

    //Performing 3 Steps:
    //1. Filtering the Data and fetching data only for "Primary Type"
    //2. Creating a Map of Key Value Pair
    //3. Applying reduce function for getting count of each key 
    val results = crimeMap.filter(f => f._1.equals("Primary Type")).
      map(x => (x._2, 1)).reduceByKey(_ + _)

    //Printing the unsorted results on the Console
    println("Printing the Count by the Type of Crime")
    results.collect().foreach(f => println(f._1 + "=" + f._2))
  }

Next, invoke the previous function from the executeTransformations(…) method and we are done. For executing the job, perform the following steps:

  1. Leverage your IDE (Eclipse) and export your project as a .jar file, name it spark-examples.jar, and save this .jar file in the root of $SPARK_HOME.
  2. Open your Linux console, browse to $SPARK_HOME, and execute the following command:
    $SPARK_HOME/bin/spark-submit --class chapter.seven.ScalaTransformCrimeData --master spark://ip-10-166-191-242:7077 spark-examples.jar
    

In the preceding command, ensure that the value given to parameter --master is the same as it is shown on your Spark UI.

As soon as you execute the following command, Spark will execute the provided transformation functions and finally print the results on the driver console, which would be similar to the following screenshot:

Programming Spark transformations and actions

Easy isn't it? Let's move to next scenario and perform some real analysis!

Note

Follow the comments provided in the code to understand each statement and transformation performed. The same style is used to explain all other scenarios. Corresponding Java implementation of the previous problem statement can be found in the code examples provided with this book.

Scenario 2

How to uncover the top five crimes performed in Chicago for August 2015?

Solution 2

The solution is again simple. We need to perform all the steps what we did in Solution 1, then sort the values (count) of the resulting map in descending order, then take the top five, and finally print on the console. Let's define a new function by the name of findTop5Crime(…) just before the closing braces of the Scala object ScalaTransformCrimeData and add the following piece of code:

  /**
   * Find the Top 5 Crimes by its "Primary Type"
   */
  def findTop5Crime(ctx: SparkContext, crimeData: RDD[String]) {

    //Utility class for Transforming Crime Data
    val analyzer = new ScalaCrimeUtil()

    //Flattening the Crime Data by converting into Map of Key/ Value Pair 
    val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x))

    //Performing 3 Steps:
    //1. Filtering the Data and fetching data only for "Primary Type"
    //2. Creating a Map of Key Value Pair
    //3. Applying reduce function for getting count of each key 
    val results = crimeMap.filter(f => f._1.equals("Primary Type")).
      map(x => (x._2, 1)).reduceByKey(_ + _)

    //Perform Sort based on the Count
    val sortedResults = results.sortBy(f => f._2, false)
    //Collect the Sorted results and print the Top 5 Crime
    println("Printing Sorted Top 5 Crime based on the Primary Type of Crime")
    sortedResults.collect().take(5).foreach(f => println(f._1 + "=" + f._2))

  }

Now invoke the preceding function from the executeTransformations(…) method and we are done. For executing the job, perform the same steps as we did earlier in Solution 1. As soon as we execute the job, Spark will execute the provided transformation functions and finally print the results on the driver console, which would be similar to the following screenshot:

Programming Spark transformations and actions

Scenario 3

How to find the total count of the crime in the month of August grouped and sorted by the type of crime (primary type)?

Solution 3

The solution of this scenario is pretty obvious. We need to perform custom sorting on the type of crime. In order to achieve that, we need to perform all the steps what we did in Solution 1 and then perform the custom sorting on the crime type. Let's define a new function by the name of findCrimeCountByPrimaryType (…) just before the closing braces of the Scala object ScalaTransformCrimeData and add the following piece of code:

  /**
   * Provide Custom Sorting on the type of Crime "Primary Type"
   */
  def performSortOnCrimeType(ctx: SparkContext, crimeData: RDD[String]) {

    //Utility class for Transforming Crime Data
    val analyzer = new ScalaCrimeUtil()

    //Flattening the Crime Data by converting into Map of Key/ Value Pair 
    val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x))

    //Performing 3 Steps:
    //1. Filtering the Data and fetching data only for "Primary Type"
    //2. Creating a Map of Key Value Pair
    //3. Applying reduce function for getting count of each key 
    val results = crimeMap.filter(f => f._1.equals("Primary Type")).
      map(x => (x._2, 1)).reduceByKey(_ + _)

    //Perform Custom Sort based on the Type of Crime (Primary Type)
    import scala.reflect.classTag
    val customSortedResults = results.sortBy(f => createCrimeObj(f._1, f._2), true)
    (CrimeOrdering, classTag[Crime])
    //Collect the Sorted results and print the Top 5 Crime
    println("Now Printing Sorted Results using Custom Sorting..............")
    customSortedResults.collect().foreach(f => println(f._1 + "=" + f._2))

  }

  /**
   * Case Class which defines the Crime Object
   */
  case class Crime(crimeType: String, count: Int)

  /**
   * Utility Function for creating Object of Class Crime
   */
  val createCrimeObj = (crimeType: String, count: Int) => {
    Crime(crimeType, count)
  }

  /**
   * Custom Ordering function which defines the Sorting behavior.
   */
  implicit val CrimeOrdering = new Ordering[Crime] {
    def compare(a: Crime, b: Crime): Int = a.crimeType.compareTo(b.crimeType)
  }

Now, invoke the preceding function from the executeTransformations(…) method and we are done. For executing the job, perform the same steps we used earlier in Solution 1. As soon as we execute the job, Spark will execute the provided transformation functions and finally print the results on the driver console, which will be similar to the following screenshot:

Programming Spark transformations and actions

Scenario 4

How to persist the filtered crime data map either as the text file or the object file on a local filesystem?

Solution 4

We need to perform all the steps we did in Solution 1 and then invoke saveAsTextFile(path:String) and saveAsObjectFile(path:String). Let's define a new function by the name of persistCrimeData(…) just before the closing braces of the Scala object ScalaTransformCrimeData and add the following piece of code:

  /**
   * Persist the filtered Crime Data Map into various formats (Text/ Object/ HDFS)
   */
    def persistCrimeData(ctx: SparkContext, crimeData: RDD[String]) {

    //Utility class for Transforming Crime Data
    val analyzer = new ScalaCrimeUtil()

    //Flattening the Crime Data by converting into Map of Key/ Value Pair 
    val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x))

    //Performing 3 Steps:
    //1. Filtering the Data and fetching data only for "Primary Type"
    //2. Creating a Map of Key Value Pair
    //3. Applying reduce function for getting count of each key 
    val results = crimeMap.filter(f => f._1.equals("Primary Type")).
      map(x => (x._2, 1)).reduceByKey(_ + _)
    
    println("Now Persisting as Text File")
    //Ensure that the Path on local file system exists till "output" folder. 
    results.saveAsTextFile("file:///home/ec2-user/softwares/crime-data/output/Crime-TextFile"+System.currentTimeMillis())
    println("Now Persisting as Object File")
    //Ensure that the Path on local file system exists till "output" folder.
    results.saveAsObjectFile("file:///home/ec2-user/softwares/crime-data/output/Crime-ObjFile"+System.currentTimeMillis())
}

Now, invoke the preceding function from the executeTransformations(…) method and we are done. For executing the job, perform the same steps as we did earlier in Solution 1.

Scenario 5

How to persist data in Hadoop HDFS?

Solution 5

In order to persist data on Hadoop HDFS, first we need to set up Hadoop. So let's perform the following steps to set up Hadoop and HDFS:

  1. Download the Hadoop 2.4.0 distribution from https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz and extract the archive to any folder of your choice on the same machine where you configured Spark.
  2. Open the Linux shell and execute the following command:
    export HADOOP_PREFIX=<path of your directory where we extracted Hadoop>
    
  3. Follow the steps defined in http://hadoop.apache.org/docs/r2.5.2/hadoop-project-dist/hadoop-common/SingleCluster.html for single-node setup. After completing the prerequisites defined in the given link, you can follow the setup instructions defined either for the pseudo-distributed mode or the fully-distributed mode. For us, the pseudo-distributed mode will work but that does not stop you from trying the latter one.
  4. Once you have completed the setup, open your Linux shell and execute the following commands:
    $HADOOP_PREFIX/bin/hdfs namenode –format
    $HADOOP_PREFIX/sbin/start-dfs.sh
    
  5. The first command will format namenode and make the filesystem ready for use. With the second command, we are starting the minimum required Hadoop services that will include namenode and secondary namenode.
  6. Next, let's execute these commands to create a directory structure in HDFS where we will store our data:
    $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark
    $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark/crime-data
    $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark/ crime-data/oldApi
    $HADOOP_PREFIX/bin/hdfs dfs -mkdir /spark/s crime-data/newApi
    

If everything goes right and there are no exceptions then open your browser, browse to http://localhost:50070/explorer.html#/ and you will be able to see the empty directories created by the preceding commands, as shown in the following screenshot:

Programming Spark transformations and actions

The preceding screenshot shows the HDFS file system explorer where we can browse, view, and download any of the files created by the users using HDFS APIs.

Note

Refer to http://hadoop.apache.org/ for more information on Hadoop and HDFS.

As we are done with the Hadoop installation, let's modify our existing function persistCrimeData(…) and add the following piece of code just before the closing braces:

   //Creating an Object of Hadoop Config with default Values
   val hConf = new JobConf(new org.apache.hadoop.conf.Configuration())
   
   //Defining the TextOutputFormat using old APIs available with =<0.20 
   val oldClassOutput = classOf[org.apache.hadoop.mapred.TextOutputFormat[Text,Text]]
   //Invoking Output operation to save data in HDFS using old APIs 
   //This method accepts the following Parameters:
   //1.Path of the File on HDFS 
   //2.Key - Class which can work with the Key  
   //3.Value - Class which can work with the Key
   //4.OutputFormat - Class needed for writing the Output in a specific Format
   //5.HadoopConfig - Object of Hadoop Config
   println("Now Persisting as Hadoop File using in Hadoop's Old APIs") 
   results.saveAsHadoopFile("hdfs://localhost:9000/spark/crime-data/oldApi/Crime-"+System.currentTimeMillis(), classOf[Text], classOf[Text], oldClassOutput ,hConf )
   
   //Defining the TextOutputFormat using new APIs available with >0.20
   val newTextOutputFormat = classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[Text, Text]]
   
   //Invoking Output operation to save data in HDFS using new APIs 
   //This method accepts same set of parameters as "saveAsHadoopFile"
   println("Now Persisting as Hadoop File using in Hadoop's New APIs")
   results.saveAsNewAPIHadoopFile("hdfs://localhost:9000/spark/crime-data/newApi/Crime-"+System.currentTimeMillis(), classOf[Text], classOf[Text], newTextOutputFormat ,hConf )

For executing the job, perform the same steps as we did earlier in Solution 1 and as soon as we execute our job, we will see the data files being generated and persisted in Hadoop HDFS:

Programming Spark transformations and actions

The preceding screenshot shows the files being generated by our Spark job in HDFS directories.

Scenario 6

How to find the total count of the crime registered in the month of August grouped by the IUCR code names?

Solution 6

IUCR codes or Illinois Uniform Crime Reporting (IUCR) codes are the standard crime codes given by the Chicago Police Department. They need to be downloaded from http://data.cityofchicago.org/Public-Safety/Chicago-Police-Department-Illinois-Uniform-Crime-R/c7ck-438e and stored by the name of IUCRCodes.txt in the same directory where we have stored our Chicago crime data. Now the solution to problem would be a two-step process. First, we need to group the crimes based on IUCR codes and second we need to merge the IUCR codes and add/replace them with their real names from IUCRCodes.txt file.

Perform the following steps to implement the solution:

  1. Define a new function in ScalaCrimeUtil.scala by the name of createIUCRDataMap(…). This function will convert the IUCR data files into a map of key/value pairs. Next, add the following code in createIUCRDataMap(…):
       /**
       * Create a Map of the data which is extracted by applying Regular expression.
       */
      def createIUCRDataMap(data:String): Map[String, String] = {
        
        //Replacing Empty columns with the blank Spaces, 
        //so that split function always produce same size Array
        val icurData = data.replaceAll(",,,", ", , , ")
        //Splitting the Single Crime record
        val array = icurData.split(",")
        //Creating the Map of values "IUCR Codes = Values"
        val iucrDataMap = Map[String, String](
        (array(0) -> array(1))
      )
      //Finally returning it to the invoking program 
      return iucrDataMap
      }
  2. Let's define a new function by the name of findCrimeCountByIUCRCodes(…) just before the closing braces of the Scala object—ScalaTransformCrimeData and add the following piece of code:
      /**
       * Find the Crime Count by IUCR Codes and also display the IUCR Code Names
       */
       def findCrimeCountByIUCRCodes(ctx: SparkContext, crimeData: RDD[String]) {
         
             //Utility class for Transforming Crime Data
        val analyzer = new ScalaCrimeUtil()
    
        //Flattening the Crime Data by converting into Map of Key/ Value Pair 
        val crimeMap = crimeData.flatMap(x => analyzer.createDataMap(x))
    
        //Performing 3 Steps:
        //1. Filtering the Data and fetching data only for "Primary Type"
        //2. Creating a Map of Key Value Pair
        //3. Applying reduce function for getting count of each key
        val results = crimeMap.filter(f => f._1.equals("IUCR")).
          map(x => (x._2, 1)).reduceByKey(_ + _)
        
        
        //Loading IUCR Codes File in Spark Memory
        val iucrFile = "file:///home/ec2-user/softwares/crime-data/IUCRCodes.txt";
        println("Loading the Dataset and will further process it")
        val iucrCodes = ctx.textFile(iucrFile, 2)
        //Convert IUCR Codes into a map of Values 
        val iucrCodeMap = iucrCodes.flatMap(x => analyzer.createIUCRDataMap(x))
        //Apply Left Outer Join to get all results from Crime RDD 
        //and matching records from IUCR RDD
        val finalResults = results.leftOuterJoin(iucrCodeMap)
        
        //Finally Print the results 
        finalResults.collect().foreach(f => println(""+f._1 + "=" + f._2))
        
       }

And we are done. The next step is to execute the preceding piece of code. Perform the same set of steps as we did to execute Solution 1. Once we execute the job, the outcome of the job will be similar to the following screenshot:

Programming Spark transformations and actions

The preceding screenshot shows the output of the Spark job which merges two different datasets and then prints the result on the driver console.

Note

Some of the values of IUCR code names are printed as None because there are no matching codes in the IUCR code came file (IUCRCodes.txt). As we are using leftOuterJoin, all data from the left-hand side dataset (crime records) is considered, while only matching records are taken from right-hand side dataset (IUCR codes) and all unmatched records are marked as None.

In this section, we discussed examples for performing transformations and actions to solve real-life problem statements. Let's move forward to the next section where we will discuss persistence in Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset