As discussed in Chapter 1, Introduction to Data Analytics with Spark that in Spark 2.0.0 release, the DataFrame remains the primary computation abstraction for the Scala, Python and R, however, while using Java the same will be replaced with Dataset. Consequently, Dataset of type Row will be used throughout this book.
The Dataset is a distributed collection of data is structured the Rows. This is this is one of the more convenient ways for interacting with Spark SQL module.
In other words, it can be considered as an equivalent entity to a tabular data like a Relational Database (RDB) format.. The Like the other data abstractions like DataFrame and RDD, the Dataset can also be created from various data sources like structured data files (TSV, CSV, JSON, and TXT), Hive tables, secondary storages, external databases, or existing RDDs and DataFrames. However, upon the Spark 2.0.0 release, the Java based computation does not support the DataFrame but you are developing your applications using Python, Scala or R, still you will be able making use of the DataFrames.
In the next few sections, you will find the operations and actions using Dataset and how to create a Dataset from different sources.
As mentioned above, the Dataset is a component of Spark SQL module introduced from the Spark 1.5.0 release. Therefore, all the entry point of all functionally starts from the initialization of Spark SQLContext
. Basically, Spark SQL is used for executing SQL queries written either as a basic SQL syntax or HiveQL.
A Dataset object will be returning when running SQL within another programming language. The following code segment will initialize the SQLContext
within Spark Context. On the other hand, you might require having the HiveContext
initialized for reading a data set from the Hive. You can also create a different context like HiveContext
which provides a superset of basic functionalities of SQLContext
:
JavaSparkContext sc = new JavaSparkContext("local","DFDemo"); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
For example, you have a JSON file as shown here. Now you want to read this file using SQL context which basically returns a DataFrame which you can perform all the basic SQL operations and other DSL operations of Spark:
[Input File] {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} [Code] Dataset<Row> df = sqlContext.read().json("people.json"); [Output: df.show()] +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
The following code connects with Hive context where one table is created and people JSON file is loaded into hive create. The output of the DataFrame will be the same as above:
The code is as follows:]hiveContext.sql("CREATE TEMPORARY TABLE people USING org.apache.spark.sql.json OPTIONS ( path "people.json" )"); Dataset<Row> results = hiveContext.sql("SELECT * FROM people "); results.show();
In the previous section, we have described the pre-processing with RDD for a practical machine learning application. Now we will do the same example using DataFrame (DF) API. You will find it very easy to manipulate the SMSSpamCollection
Dataset (see at http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/). We will show the same example by tokenizing the spam and ham messages for preparing a training set:
spark
that we have to initialize before using it. After reading the file as Dataset the output will be a tabular format of a single column. The default name of this column is value
:Dataset<Row> df = spark.read().load("input/SMSSpamCollection.txt"); df.show();
Output:
RowFactory
class of Spark:JavaRDD<Row> rowRDD = df.toJavaRDD();
JavaRDD<Row> splitedRDD = rowRDD.map(new Function<Row, Row>() { @Override public Row call(Row r) throws Exception { String[] split = r.getString(0).split(" "); return RowFactory.create(split[0],split[1]); }});
List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("labelString", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("featureString", DataTypes.StringType, true)); org.apache.spark.sql.types.StructType schema = DataTypes .createStructType(fields); Dataset<Row> schemaSMSSpamCollection = sqlContext .createDataFrame(splitedRDD, schema); schemaSMSSpamCollection.printSchema(); [Output: schemaSMSSpamCollection.printSchema()]
labledSting
to labedDouble
and featureString
to featureTokens
. You can do it similarly as previous code. After adding to new fields create new schema. Then create new DF after having normal map transformation in existing DF. The following code gives output of new DF having four columns:fields.add(DataTypes.createStructField("labelDouble", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("featureTokens", DataTypes.StringType, true)); org.apache.spark.sql.types.StructType schemaUpdated = DataTypes.createStructType(fields); Dataset Row> newColumnsaddedDF = sqlContext .createDataFrame(schemaSMSSpamCollection.javaRDD().map( new Function<Row, Row>() { @Override public Row call(Row row) throws Exception { double label; if(row.getString(0).equalsIgnoreCase("spam")) label = 1.0; else label = 0.0; String[] split = row.getString(1).split(" "); ArrayList<String> tokens = new ArrayList<>(); for(String s:split) tokens.add(s.trim()); return RowFactory.create(row.getString(0), row.getString(1),label, tokens.toString()); }}), schemaUpdated); [Output: newColumnsaddedDF.show()]
groupBy
and so on operations into a DF. The following codes show some operations on the above DF:newColumnsaddedDF.select(newColumnsaddedDF.col("labelDouble"), newColumnsaddedDF.col("featureTokens")).show();
newColumnsaddedDF.filter(newColumnsaddedDF.col ("labelDouble").gt(0.0)).show();
newColumnsaddedDF.groupBy("labelDouble").count().show();
This section will describe how to use SQL queries on DF and different way to create Datasets across the datasets. Mainly running the SQL queries on DataFrame and the creating DataFrame from the JavaBean will be discussed in this section. However, interested readers can refer Spark programing guidelines for SQL operation in [3].
The SQLContext
of Spark has sql
method enables applications to run SQL queries. This method returns a DataFrame as a result:
FilternewColumnsAddedDF.createOrReplaceTempView
(SMSSpamCollection
)]:Dataset<Row> spam = spark.sqlContext().sql("SELECT * FROM SMSSpamCollection WHERE labelDouble=1.0"); spam.show();
The following is the output of the preceding code:
Dataset<Row> counts = sqlContext.sql("SELECT labelDouble, COUNT(*) AS count FROM SMSSpamCollection GROUP BY labelDouble"); counts.show();
Output:
You can create Dataset from a Java Bean; where you don't need to define the schemas programmatically. For example, you can see Plain Old Java Object (POJO) named as Bean in the following code:
public class SMSSpamBean implements Serializable { private String labelString; private String featureString; public SMSSpamBean(String labelString, String featureString) { super(); this.labelString = labelString; this.featureString = featureString; } public String getLabelString() { return labelString; } public void setLabelString(String labelString) { this.labelString = labelString; } public String getFeatureString() { return featureString; } public void setFeatureString(String featureString) { this.featureString = featureString; }}
Create DF:
JavaRDD<SMSSpamBean> smsSpamBeanRDD = rowRDD.map(new Function<Row, SMSSpamBean>() { @Override public SMSSpamBean call(Row r) throws Exception { String[] split = r.getString(0).split(" "); return new SMSSpamBean(split[0],split[1]); }}); Dataset<Row> SMSSpamDF = spark.sqlContext().createDataFrame(smsSpamBeanRDD, SMSSpamBean.class); SMSSpamDF.show();
The following output is as follows: