Working with Hive tables

In this section, we will discuss the integration of Spark SQL with Hive tables. We will see the process of executing the Hive queries in Spark SQL, which will help us in creating and analyzing Hive tables in HDFS.

Spark SQL provides the flexibility of directly executing Hive queries with our Spark SQL codebase. The best part is that the Hive queries are executed on the Spark cluster and we just require the setup of HDFS for reading and storing the Hive tables. In other words, there is no need to set up a complete Hadoop cluster with services like ResourceManager or NodeManager. We just need services of HDFS, which are available as soon as we start NameNode and DataNode.

Perform the following steps for creating Hive tables for our Chicago crime data and at the same time also execute some analytical Hive queries:

  1. Open and edit the Spark-Examples project and create a Scala object named chapter.eight.ScalaSparkSQLToHive.scala.
  2. Next, edit chapter.eight.ScalaSparkSQLToHive.scala and add the following piece of code in it:
    import org.apache.spark.sql._
    import org.apache.spark._
    import org.apache.spark.sql.hive.HiveContext
    
    object ScalaSparkSQLToHive {
      
      def main(args:Array[String]){
        
        //Defining/ Creating SparkConf Object
        val conf = new SparkConf()
        //Setting Application/ Job Name
        conf.setAppName("Spark SQL - RDD To Hive")
        // Define Spark Context which we will use to initialize our SQL Context 
        val sparkCtx = new SparkContext(conf)
        //Creating Hive Context
        val hiveCtx = new HiveContext(sparkCtx)
        //Creating a Hive Tables
        println("Creating a new Hive Table - ChicagoCrimeRecordsAug15")
        hiveCtx.sql("CREATE TABLE IF NOT EXISTS ChicagoCrimeRecordsAug15(ID STRING,CaseNumber STRING, CrimeDate STRING,Block STRING,IUCR STRING,PrimaryType STRING,Description STRING,LocationDescription STRING,Arrest STRING,Domestic STRING,Beat STRING,District STRING,Ward STRING,CommunityArea STRING,FBICode STRING,XCoordinate STRING,YCoordinate STRING,Year STRING,UpdatedOn STRING,Latitude STRING,Longitude STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile")
        println("Creating a new Hive Table - iucrCodes")
        hiveCtx.sql("CREATE TABLE IF NOT EXISTS iucrCodes(IUCR STRING,PRIMARY_DESC STRING ,SECONDARY_DESC STRING,INDEXCODE STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile")
        //Load the Data in Hive Table
        println("Loading Data in Hive Table - ChicagoCrimeRecordsAug15")
        hiveCtx.sql("LOAD DATA LOCAL INPATH '/home/ec2-user/softwares/crime-data/Crimes_-Aug-2015.csv' OVERWRITE INTO TABLE ChicagoCrimeRecordsAug15")
        println("Loading Data in Hive Table - iucrCodes")
        hiveCtx.sql("LOAD DATA LOCAL INPATH '/home/ec2-user/softwares/crime-data/IUCRCodes.csv' OVERWRITE INTO TABLE iucrCodes")
        //Quick Check on the number of records loaded in the Hive Table
        println("Quick Check on the Number of records Loaded in ChicagoCrimeRecordsAug15")
        hiveCtx.sql("select count(1) from ChicagoCrimeRecordsAug15").show()
        println("Quick Check on the Number of records Loaded in iucrCodes")
        hiveCtx.sql("select count(1) from iucrCodes").show()
        
        println("Now Performing Analysis")
        println("Top 5 Crimes in August Based on IUCR Codes")
        hiveCtx.sql("select B.PRIMARY_DESC, count(A.IUCR) as countIUCR from ChicagoCrimeRecordsAug15 A,iucrCodes B where A.IUCR=B.IUCR group by B.PRIMARY_DESC order by countIUCR desc").show(5)
      
        println("Count of Crimes which are of Type 'Domestic' and someone is 'Arrested' by the Police")
        hiveCtx.sql("select B.PRIMARY_DESC, count(A.IUCR) as countIUCR from ChicagoCrimeRecordsAug15 A,iucrCodes B where A.IUCR=B.IUCR and A.domestic='true' and A.arrest='true' group by B.PRIMARY_DESC order by countIUCR desc").show()
        
        println("Find Top 5 Community Areas where Highest number of Crimes have been Committed in Aug-2015")
        hiveCtx.sql("select CommunityArea, count(CommunityArea) as cnt from ChicagoCrimeRecordsAug15 group by CommunityArea order by cnt desc").show(5)
    
      }
    }

The preceding piece of code first creates the HiveContext and then leverages HiveQL (https://cwiki.apache.org/confluence/display/Hive/LanguageManual) for creating Hive tables, loading data in Hive tables, and finally performing various kinds of analysis.

We are done with the coding of our Spark job and now we have to perform the following configurations before we can execute our Spark SQL job:

  1. Ensure your Hadoop HDFS is up and running by browsing to http://<HOST_NAME>:50070/. It should show up the Hadoop NameNode UI. If the URL does not show the NameNode home page, then follow the steps defined in the Programming Spark transformation and actions section in Chapter 7, Programming with RDDs, to configure Hadoop and HDFS.
  2. The next step is to configure Apache Hive parameters in our Spark installation, which can be easily done by creating a hive-site.xml file and placing it in the $SPARK_HOME/conf folder. In case you already have Hive installed, then you just need to copy hive-site.xml from your Hive installation directory to the $SPARK_HOME/conf folder. In case you do not have it, then create a new file in $SPARK_HOME/conf/hive-site.xml and add the following content in it:
    <configuration>
    
     <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:derby:;databaseName=/home/ec2-user/softwares/hive-1.2.1/metastore/metastore_db;create=true</value>
        <description>JDBC connect string for a JDBC metastore</description>
      </property>
    
    </configuration>

    The preceding configuration is the bare minimum configuration required to execute Hive queries. The property defines the location of the Hive metastore DB, which will contain all the metadata about the Hive tables. We need to be careful with this property because if the metastore is deleted, then we cannot access any of the Hive tables.

  3. We are done with all configurations and our last and final step is to export our Eclipse project and execute our Spark SQL job using spark-submit:
    $SPARK_HOME/bin/spark-submit --class chapter.eight.ScalaSparkSQLToHive --master spark://ip-10-184-194-147:7077 spark-examples.jar

As soon as we execute the preceding command on our Linux console, Spark will accept our job, start executing it, and further the results of our analysis on the console. The result will be similar to the following screenshot:

Working with Hive tables

The preceding screenshot shows the output of the Hive queries that we executed in our Spark SQL job on the driver console.

Spark also provides a utility ($SPARK_HOME/bin/spark-sql) that can be executed on our Linux console and further we can execute all our Hive queries and see the result on the same console. It helps in quick development of our Hive queries and we can also analyze the performance of our Hive queries by appending the explain keyword at the beginning of our Hive queries.

Note

Refer to https://cwiki.apache.org/confluence/display/Hive/LanguageManual for more information on the syntax of HiveQL.

It is important to mention that Hive on Spark is not same as Spark on Hive. We discussed the scenarios where we execute the Hive queries using Spark SQL APIs, which is essentially referred as Spark on Hive. Hive on Spark is a separate discussion where we discuss adding Spark as a third execution engine (apart from MapReduce and Tez) for Apache Hive. Refer to following links for more information on Hive on Spark:

In this section, we discussed the integration of Spark SQL with Hive with appropriate examples. Let's move forward towards the next section where we will talk about performance tuning and best practices for Spark SQL.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset