Dataset basics

As discussed in Chapter 1, Introduction to Data Analytics with Spark that in Spark 2.0.0 release, the DataFrame remains the primary computation abstraction for the Scala, Python and R, however, while using Java the same will be replaced with Dataset. Consequently, Dataset of type Row will be used throughout this book.

The Dataset is a distributed collection of data is structured the Rows. This is this is one of the more convenient ways for interacting with Spark SQL module.

In other words, it can be considered as an equivalent entity to a tabular data like a Relational Database (RDB) format.. The Like the other data abstractions like DataFrame and RDD, the Dataset can also be created from various data sources like structured data files (TSV, CSV, JSON, and TXT), Hive tables, secondary storages, external databases, or existing RDDs and DataFrames. However, upon the Spark 2.0.0 release, the Java based computation does not support the DataFrame but you are developing your applications using Python, Scala or R, still you will be able making use of the DataFrames.

In the next few sections, you will find the operations and actions using Dataset and how to create a Dataset from different sources.

Reading datasets to create the Dataset

As mentioned above, the Dataset is a component of Spark SQL module introduced from the Spark 1.5.0 release. Therefore, all the entry point of all functionally starts from the initialization of Spark SQLContext . Basically, Spark SQL is used for executing SQL queries written either as a basic SQL syntax or HiveQL.

A Dataset object will be returning when running SQL within another programming language. The following code segment will initialize the SQLContext within Spark Context. On the other hand, you might require having the HiveContext initialized for reading a data set from the Hive. You can also create a different context like HiveContext which provides a superset of basic functionalities of SQLContext:

JavaSparkContext sc = new JavaSparkContext("local","DFDemo"); 
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 

Reading from the files

For example, you have a JSON file as shown here. Now you want to read this file using SQL context which basically returns a DataFrame which you can perform all the basic SQL operations and other DSL operations of Spark:

[Input File] 
{"name":"Michael"} 
{"name":"Andy", "age":30} 
{"name":"Justin", "age":19} 
[Code]    
Dataset<Row> df = sqlContext.read().json("people.json");    
 
[Output: df.show()] 
+----+-------+ 
| age|   name| 
+----+-------+ 
|null|Michael| 
|  30|   Andy| 
|  19| Justin| 
+----+-------+ 

Reading from the Hive

The following code connects with Hive context where one table is created and people JSON file is loaded into hive create. The output of the DataFrame will be the same as above:

The code is as follows:]hiveContext.sql("CREATE TEMPORARY TABLE people USING  
org.apache.spark.sql.json OPTIONS ( path "people.json" )"); 
Dataset<Row> results = hiveContext.sql("SELECT * FROM people "); 
results.show(); 

Pre-processing with Dataset

In the previous section, we have described the pre-processing with RDD for a practical machine learning application. Now we will do the same example using DataFrame (DF) API. You will find it very easy to manipulate the SMSSpamCollection Dataset (see at http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/). We will show the same example by tokenizing the spam and ham messages for preparing a training set:

  • Reading a Dataset: You can read that Dataset using the Spark session variable sparkthat we have to initialize before using it. After reading the file as Dataset the output will be a tabular format of a single column. The default name of this column is value:
          Dataset<Row> df = spark.read().load("input/SMSSpamCollection.txt"); 
          df.show(); 
    

    Output:

    Pre-processing with Dataset

    Figure 7: A snapshot of the SMS spam dataset

  • Create Row RDD from existing Dataset: From the preceding output you can see one column containing all the lines together. In order to make two columns such as label and features, we have to split it. Since Dataset is immutable you cannot modify the existing columns or Dataset. So you have to create new Dataset using the existing Dataset. Here the code converts the Dataset to RDD that is the collection of Row dataset. The row is an interface, which represents one row of output from a relational operator. You can create a new Row using RowFactory class of Spark:
             JavaRDD<Row> rowRDD = df.toJavaRDD(); 
    
  • Create new row RDD from an existing row RDD: After having the Row RDD you can perform normal map operation which is all contains Row Dataset but having two values. The following code split the each row and returns a new one:
          JavaRDD<Row> splitedRDD = rowRDD.map(new Function<Row, Row>() { 
               @Override 
              public Row call(Row r) throws Exception { 
                String[] split = r.getString(0).split("	"); 
                return RowFactory.create(split[0],split[1]); 
              }}); 
    
  • Create Dataset from Row RDD: Now you have Row RDD, which contains two values for each Row. For creating a DF, you have to define the column names or schemas and its data types. There are two methods to define including inferring the schema using reflection and programmatically specify the schema. The methods are as follows:
    • The 1st method basically uses the POJO classes and fields names will be the schema
    • The 2nd method create list of StruchFields by defining the datatypes and create the structype. For this example, we have used the 2nd method for creating DF from existing row RDD as shown here:
            List<StructField> fields  = new ArrayList<>(); 
            fields.add(DataTypes.createStructField("labelString", 
            DataTypes.StringType, true)); 
            fields.add(DataTypes.createStructField("featureString",  
            DataTypes.StringType, true)); 
            org.apache.spark.sql.types.StructType schema = DataTypes 
            .createStructType(fields); 
            Dataset<Row> schemaSMSSpamCollection = sqlContext 
            .createDataFrame(splitedRDD, schema); 
            schemaSMSSpamCollection.printSchema(); 
            [Output: schemaSMSSpamCollection.printSchema()] 
      
      Pre-processing with Dataset

      Figure 8: Schema of the collection

  • Adding a new column: Now that we have the DF of two columns. But we want to add new columns which convert the labledSting to labedDouble and featureString to featureTokens. You can do it similarly as previous code. After adding to new fields create new schema. Then create new DF after having normal map transformation in existing DF. The following code gives output of new DF having four columns:
          fields.add(DataTypes.createStructField("labelDouble",  
          DataTypes.DoubleType, true)); 
          fields.add(DataTypes.createStructField("featureTokens",  
          DataTypes.StringType, true)); 
          org.apache.spark.sql.types.StructType schemaUpdated =  
          DataTypes.createStructType(fields); 
          Dataset Row> newColumnsaddedDF = sqlContext 
          .createDataFrame(schemaSMSSpamCollection.javaRDD().map( 
          new Function<Row, Row>() { 
              @Override 
              public Row call(Row row) throws Exception { 
                double label; 
                if(row.getString(0).equalsIgnoreCase("spam")) 
                  label = 1.0; 
                else 
                  label = 0.0; 
                String[] split = row.getString(1).split(" "); 
                ArrayList<String> tokens = new ArrayList<>(); 
                for(String s:split) 
                  tokens.add(s.trim()); 
                return RowFactory.create(row.getString(0), 
           row.getString(1),label, tokens.toString()); 
              }}), schemaUpdated);   
          [Output: newColumnsaddedDF.show()] 
    
    Pre-processing with Dataset

    Figure 9: The dataset after adding a new column

  • Some Dataset operations: For data manipulation DF provides domain specific language in Java, Scala and others. You can do select, counting, filter, groupBy and so on operations into a DF. The following codes show some operations on the above DF:
          newColumnsaddedDF.select(newColumnsaddedDF.col("labelDouble"),
          newColumnsaddedDF.col("featureTokens")).show(); 
    
    Pre-processing with Dataset

    Figure 10: Dataset showing the label and features

          newColumnsaddedDF.filter(newColumnsaddedDF.col
          ("labelDouble").gt(0.0)).show(); 
    
    Pre-processing with Dataset

    Figure 11: Dataset showing that the label has been converted into double value

          newColumnsaddedDF.groupBy("labelDouble").count().show(); 
    
    Pre-processing with Dataset

    Figure 12: showing the Dataset statistics after manipulations

More about Dataset manipulation

This section will describe how to use SQL queries on DF and different way to create Datasets across the datasets. Mainly running the SQL queries on DataFrame and the creating DataFrame from the JavaBean will be discussed in this section. However, interested readers can refer Spark programing guidelines for SQL operation in [3].

Running SQL queries on Dataset

The SQLContext of Spark has sql method enables applications to run SQL queries. This method returns a DataFrame as a result:

  • [FilternewColumnsAddedDF.createOrReplaceTempView(SMSSpamCollection)]:
          Dataset<Row> spam = spark.sqlContext().sql("SELECT * FROM 
          SMSSpamCollection
          WHERE labelDouble=1.0"); 
          spam.show();  
    

    The following is the output of the preceding code:

    Running SQL queries on Dataset

    Figure 13: using SQL query to retrieve same result as Figure 11

  • Count:
          Dataset<Row> counts = sqlContext.sql("SELECT labelDouble, COUNT(*)  
          AS count FROM SMSSpamCollection GROUP BY labelDouble"); 
          counts.show(); 
    

    Output:

    Running SQL queries on Dataset

    Figure 14: Showing the Dataset statistics

Creating Dataset from the Java Bean

You can create Dataset from a Java Bean; where you don't need to define the schemas programmatically. For example, you can see Plain Old Java Object (POJO) named as Bean in the following code:

public class SMSSpamBean implements Serializable { 
  private String labelString; 
  private String featureString; 
public SMSSpamBean(String labelString, String featureString) { 
    super(); 
    this.labelString = labelString; 
    this.featureString = featureString; 
  } 
  public String getLabelString() { 
    return labelString; 
  } 
  public void setLabelString(String labelString) { 
    this.labelString = labelString; 
  } 
  public String getFeatureString() { 
    return featureString; 
  }  public void setFeatureString(String featureString) {    this.featureString = featureString; 
  }}  

Create DF:

JavaRDD<SMSSpamBean> smsSpamBeanRDD =  rowRDD.map(new Function<Row, SMSSpamBean>() { 
      @Override 
    public SMSSpamBean call(Row r) throws Exception { 
        String[] split = r.getString(0).split("	"); 
        return new SMSSpamBean(split[0],split[1]); 
      }});   
Dataset<Row> SMSSpamDF = spark.sqlContext().createDataFrame(smsSpamBeanRDD, SMSSpamBean.class); 
SMSSpamDF.show();   

The following output is as follows:

Creating Dataset from the Java Bean

Figure 15: Corresponding feature and label string

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset