Coding our first Spark SQL job

In this section, we will discuss the basics of writing/coding Spark SQL jobs in Scala and Java. Spark SQL exposes the rich DataFrame API (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) for loading and analyzing datasets in various forms. It not only provides operations for loading/analyzing data from structured formats such as Hive, Parquet, and RDBMS, but also provides flexibility to load data from semistructured formats such as JSON and CSV. In addition to the various explicit operations exposed by the DataFrame API, it also facilitates the execution of SQL queries against the data loaded in the Spark.

Let's move ahead and code our first Spark SQL job in Scala and then we will also look at the corresponding implementation in Java.

Coding a Spark SQL job in Scala

In this section, we will code and execute our first Spark SQL Job using Scala APIs.

It is our first Spark SQL job, so we will make it simple and use some sample data of company, departments, and employees in the JSON format. Consider the following code:

[  
   {  
      "Name":"DEPT_A",
      "No_Of_Emp":10,
      "No_Of_Supervisors":2
   },
   {  
      "Name":"DEPT_B",
      "No_Of_Emp":12,
      "No_Of_Supervisors":2
   },
   {  
      "Name":"DEPT_C",
      "No_Of_Emp":14,
      "No_Of_Supervisors":3
   }
]

We need to copy the preceding JSON data and save it at the same location where we saved our Chicago crime dataset and name it company.json.

Next, perform the following steps to process the same data using Scala APIs:

  1. Open your Spark-Examples project and create a new package and Scala object named chapter.eight.ScalaFirstSparkSQLJob.
  2. Edit ScalaFirstSparkSQLJob and add following code snippet just below the package declaration:
    import org.apache.spark.sql._
    import org.apache.spark._
    
    object ScalaFirstSparkSQLJob {
    
      def main(args: Array[String]) {
          //Defining/ Creating SparkConf Object
          val conf = new SparkConf()
          //Setting Application/ Job Name
          conf.setAppName("First Spark SQL Job in Scala")
          // Define Spark Context which we will use to initialize our SQL Context 
          val sparkCtx = new SparkContext(conf)
          //Creating SQL Context
          val sqlCtx = new SQLContext(sparkCtx)
      
          //Defining path of the JSON file which contains the data in JSON Format 
          val jsonFile = "file:///home/ec2-user/softwares/crime-data/company.json" 
          
          //Utility method exposed by SQLContext for reading JSON file 
          //and create dataFrame
          //Once DataFrame is created all the data names like "Name" in the JSON file 
          //will interpreted as Column Names and their data Types will be 
          //interpreted automatically based on their values
          val dataFrame = sqlCtx.read.json(jsonFile)
        
          //Defining a function which will execute operations 
          //exposed by DataFrame API's
          executeDataFrameOperations(sqlCtx,dataFrame)
          //Defining a function which will execute SQL Queries using 
          //DataFrame API's
          executeSQLQueries(sqlCtx,dataFrame)
        
      }
      
      /**
       * This function executes various operations exposed by DataFrame API
       */
      def executeDataFrameOperations(sqlCtx:SQLContext, dataFrame:DataFrame):Unit = {
          //Invoking various basic operations available with DataFrame 
          println("Printing the Schema...")
          dataFrame.printSchema()
          //Printing Total Rows Loaded into DataFrames
          println("Total Rows - "+dataFrame.count())
          //Printing first Row of the DataFrame
          println("Printing All Rows in the Data Frame")
          println(dataFrame.collect().foreach { row => println(row) })
          //Sorting the records and then Printing all Rows again
          //Sorting is based on the DataType of the Column. 
          //In our Case it is String, so it be natural order sorting
          println("Here is the Sorted Rows by 'No_Of_Supervisors' - Descending")
          dataFrame.sort(dataFrame.col("No_Of_Supervisors").desc).show(10)
    
      }
      
      /**
       * This function registers the DataFrame as SQL Table and execute SQL Queries
       */
      def executeSQLQueries(sqlCtx:SQLContext, dataFrame:DataFrame):Unit = {
        
        //The first step is to register the DataFrame as temporary table
        //And give it a name. In our Case "Company"
        dataFrame.registerTempTable("Company")
        println("Executing SQL Queries...")
        //Now Execute the SQL Queries and print the results
        //Calculating the total Count of Rows
        val dfCount = sqlCtx.sql("select count(1) from Company")
        println("Calculating total Rows in the Company Table...")
        dfCount.collect().foreach(println)
       
        //Printing the complete data in the Company table
        val df = sqlCtx.sql("select * from Company")
        println("Dumping the complete Data of Company Table...")
        dataFrame.collect().foreach(println)
        
        //Printing the complete data in the Company table sorted by Supervisors
        val dfSorted = sqlCtx.sql("select * from Company order by No_Of_Supervisors desc")
        println("Dumping the complete Data of Company Table, sorted by Supervisors - Descending...")
        dfSorted.collect().foreach(println)
            
      }}

    The preceding piece of code loads and transforms the JSON data (company.json) and then executes a few operations using the DataFrame API and it also performs transformation using SQL queries.

  3. Next, we will execute our ScalaFirstSparkSQLJob and see the results on the driver console. Assuming that your Spark cluster is up and running, you just need to use the same spark-submit utility to submit your job. This is the same as what we did in the Coding Spark job in Scala section in Chapter 6, Getting Acquainted with Spark. Our spark-submit command would look something like this:
    $SPARK_HOME/bin/spark-submit --class chapter.eight. ScalaFirstSparkSQLJob --master spark://ip-10-166-191-242:7077 spark-examples.jar
    

As soon as you execute the command, your job will start executing and will produce results similar to the following screenshot:

Coding a Spark SQL job in Scala

The preceding screenshot shows the output of our first Spark SQL job on the driver console. Let's move towards the next section where we will code the same job using Spark Java APIs.

Coding a Spark SQL job in Java

In this section, we will discuss and write our first Spark SQL job using Spark SQL Java APIs.

Assuming that company.json, which we created in the previous section, still exists on your cluster, perform the following steps to process the same data using Scala APIs:

  1. Open your Spark-Examples project and create a new package and Scala object named chapter.eight.JavaFirstSparkSQLJob.
  2. Edit JavaFirstSparkSQLJob and add the following code snippet just below the package declaration:
    import org.apache.spark.*;
    import org.apache.spark.sql.*;
    import org.apache.spark.api.java.*;
    
    public class JavaFirstSparkSQLJob {
    
      public JavaFirstSparkSQLJob() {
        System.out.println("Creating Spark Configuration");
        // Create an Object of Spark Configuration
        SparkConf javaConf = new SparkConf();
        // Set the logical and user defined Name of this Application
        javaConf.setAppName("First Spark SQL Job in Java");
        System.out.println("Creating Spark Context");
        // Create a Spark Context and provide previously created
        // Object of SparkConf as an reference.
        JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
    
        // Defining path of the JSON file which contains the data in JSON Format
        String jsonFile = "file:///home/ec2-user/softwares/crime-data/company.json";
    
        // Creating SQL Context
        SQLContext sqlContext = new SQLContext(javaCtx);
        // Utility method exposed by SQLContext for reading JSON file
        // and create dataFrame
        // Once DataFrame is created all the data names like "Name" in the JSON
        // file will be interpreted as Column Names and their data Types 
        // will be interpreted automatically based on their values
        DataFrame dataFrame = sqlContext.read().json(jsonFile);
        // Defining a function which will execute operations
        // exposed by DataFrame API's
        executeDataFrameOperations(sqlContext, dataFrame);
        // Defining a function which will execute SQL Queries using
        // DataFrame API's
        executeSQLQueries(sqlContext, dataFrame);
    
        //Closing Context for clean exit
        javaCtx.close();
    
      }
    
      /**
       * This function executes various operations exposed by DataFrame API
       */
      public void executeDataFrameOperations(SQLContext sqlCtx,
          DataFrame dataFrame) {
        // Invoking various basic operations available with DataFrame
        System.out.println("Printing the Schema...");
        dataFrame.printSchema();
        // Printing Total Rows Loaded into DataFrames
        System.out.println("Total Rows - " + dataFrame.count());
        // Printing first Row of the DataFrame
        System.out.println("Printing All Rows in the Data Frame");
        dataFrame.show();
        // Sorting the records and then Printing all Rows again
        // Sorting is based on the DataType of the Column.
        // In our Case it is String, so it be natural order sorting
        System.out.println("Here is the Sorted Rows by 'No_Of_Supervisors' - Descending");
        DataFrame sortedDF = dataFrame.sort(dataFrame.col("No_Of_Supervisors").desc());
        sortedDF.show();
    
      }
    
      /**
       * This function registers the DataFrame as SQL Table and execute SQL
       * Queries
       */
      public void executeSQLQueries(SQLContext sqlCtx, DataFrame dataFrame) {
    
        // The first step is to register the DataFrame as temporary table
        // And give it a name. In our Case "Company"
        dataFrame.registerTempTable("Company");
        System.out.println("Executing SQL Queries...");
        // Now Execute the SQL Queries and print the results
        // Calculating the total Count of Rows
        DataFrame dfCount = sqlCtx
            .sql("select count(1) from Company");
        System.out.println("Calculating total Rows in the Company Table...");
        dfCount.show();
    
        // Printing the complete data in the Company table
        DataFrame df = sqlCtx.sql("select * from Company");
        System.out.println("Dumping the complete Data of Company Table...");
        df.show();
        // Printing the complete data in the Company table sorted by Supervisors
        DataFrame dfSorted = sqlCtx
            .sql("select * from Company order by No_Of_Supervisors desc");
        System.out
            .println("Dumping the complete Data of Company Table, sorted by Supervisors - Descending...");
        dfSorted.show();
    
      }
      public static void main(String[] args) 
    {new JavaFirstSparkSQLJob();}}

    The preceding piece of code performs a similar function to our Scala job but it leverages Java APIs.

  3. Next, we will execute JavaFirstSparkSQLJob and see the results on the driver console. Assuming that your Spark cluster is up and running, you just need to use the same spark-submit utility to submit your job. This is the same as what we did in the Coding Spark job in Scala section in Chapter 6, Getting Acquainted with Spark. Your spark-submit command would look something like this:
    $SPARK_HOME/bin/spark-submit --class chapter.eight. JavaFirstSparkSQLJob --master spark://ip-10-166-191-242:7077 spark-examples.jar
    

    As soon as you execute the preceding command, your job will start executing and the results will be similar to what we saw for our Scala job.

Finally, we are done with coding and executing our first Spark SQL job using Scala and Java APIs. In the next sections, we will discuss the nitty-gritty of Spark SQL and its various features.

Note

To avoid verbosity, going forward, we will discuss the implementations using Scala APIs.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset