Spark SQL and its DataFrames
and Datasets
interfaces are the future of Spark performance, with more efficient storage options, advanced optimizer, and direct operations on serialized data.
These components are super important for getting the best of Spark performance (see Figure 3-1).
These are relatively new components; Datasets
were introduced in Spark 1.6, DataFrames
in Spark 1.3, and the SQL engine in Spark 1.0.
This chapter is focused on helping you learn how to best use Spark SQL’s tools and how to intermix Spark SQL with traditional Spark operations.
Spark’s DataFrames
have very different functionality compared to traditional DataFrames
like Panda’s and R’s. While these all deal with structured data, it is important not to depend on your existing intuition surrounding DataFrames
.
Like RDDs, DataFrames
and Datasets
represent distributed collections, with additional schema information not found in RDDs.
This additional schema information is used to provide a more efficient storage layer (Tungsten), and in the optimizer (Catalyst) to perform additional optimizations.
Beyond schema information, the operations performed on Datasets
and DataFrames
are such that the optimizer can inspect the logical meaning rather than arbitrary functions.
DataFrames
are Datasets
of a special Row
object, which doesn’t provide any compile-time type checking.
The strongly typed Dataset API shines especially for use with more RDD-like functional operations.
Compared to working with RDDs, DataFrames
allow Spark’s optimizer to better understand our code and our data, which allows for a new class of optimizations we explore in “Query Optimizer”.
While Spark SQL, DataFrames
, and Datasets
provide many excellent enhancements, they still have some rough edges compared to traditional processing with “regular” RDDs.
The Dataset API, being brand new at the time of this writing, is likely to experience some changes in future versions.
Much as the SparkContext
is the entry point for all Spark applications, and the StreamingContext
is for all streaming applications, the SparkSession
serves as the entry point for Spark SQL.
Like with all of the Spark components, you need to import a few extra components as shown in Example 3-1.
If you are using the Spark Shell you will automatically get a SparkSession
called spark
to accompany the SparkContext
called sc
.
import
org.apache.spark.sql.
{
Dataset
,
DataFrame
,
SparkSession
,
Row
}
import
org.apache.spark.sql.catalyst.expressions.aggregate._
import
org.apache.spark.sql.expressions._
import
org.apache.spark.sql.functions._
Scala’s type alias of DataFrame = Dataset[Row]
is broken in Java—you must use Dataset<Row>
instead.
SparkSession
is generally created using the builder pattern, along with getOrCreate()
, which will return an existing session if one is already running.
The builder can take string-based configuration keys config(key, value)
, and shortcuts exist for a number of common params.
One of the more important shortcuts is enableHiveSupport()
, which will give you access to Hive UDFs and does not require a Hive installation—but does require certain extra JARs (discussed in “Spark SQL Dependencies”).
Example 3-2 shows how to create a SparkSession
with Hive support.
The enableHiveSupport()
shortcut not only configures Spark SQL to use these Hive JARs, but it also eagerly checks that they can be loaded—leading to a clearer error message than setting configuration values by hand.
In general, using shortcuts listed in the API docs, is advised when they are present, since no checking is done in the generic config
interface.
val
session
=
SparkSession
.
builder
()
.
enableHiveSupport
()
.
getOrCreate
()
// Import the implicits, unlike in core Spark the implicits are defined
// on the context.
import
session.implicits._
When using getOrCreate
, if an existing session exists your configuration values may be ignored and you will simply get the existing SparkSession
.
Some options, like master
, will also only apply if there is no existing SparkContext
running; otherwise, the existing SparkContext
will be used.
Before Spark 2.0, instead of the SparkSession
, two separate entry points (HiveContext
or SQLContext
) were used for Spark SQL.
The names of these entry points can be a bit confusing, and it is important to note the HiveContext
does not require a Hive installation. The primary reason to use the SQLContext
is if you have conflicts with the Hive dependencies that cannot be resolved.
The HiveContext
has a more complete SQL parser compared to the SQLContext
as well as additional user-defined functions (UDFs).1
Example 3-4 shows how to create a legacy HiveContext
.
The SparkSession
should be preferred when possible, followed by the HiveContext
, then SQLContext
.
Not all libraries, or even all Spark code, has been updated to take the SparkSession
and in some cases you will find functions that still expect a SQLContext
or HiveContext
.
If you need to construct one of the legacy interfaces (SQLContext
or HiveContext
) the additional imports in Example 3-3 will be useful.
import
org.apache.spark.sql.SQLContext
import
org.apache.spark.sql.hive.HiveContext
import
org.apache.spark.sql.hive.thriftserver._
Getting a HiveContext
or SQLContext
from a SparkSession
is not well supported outside of the org.apache.spark
scope—however, getOrCreate
can be used.
val
hiveContext
=
new
HiveContext
(
sc
)
// Import the implicits, unlike in core Spark the implicits are defined
// on the context.
import
hiveContext.implicits._
Like the other components in Spark, using Spark SQL requires adding additional dependencies. If you have conflicts with the Hive JARs you can’t fix through shading, you can just limit yourself to the spark-sql JAR—although you want to have access to the Hive dependencies without also including the spark-hive JAR.
To enable Hive support in SparkSession
or use the HiveContext
you will need to add both Spark’s SQL
and Hive
components to your dependencies.
For Maven-compatible build systems, the coordinates for Spark’s SQL
and Hive
components in 2.2.0 are org.apache.spark:spark-sql_2.11:2.2.0
and org.apache.spark:spark-hive_2.11:2.2.0
.
Example 3-5 shows how to add them to a “regular” sbt build,
and Example 3-6 shows the process for Maven users.
libraryDependencies
++=
Seq
(
"org.apache.spark"
%%
"spark-sql"
%
"2.2.0"
,
"org.apache.spark"
%%
"spark-hive"
%
"2.2.0"
)
<dependency>
<!-- Spark dependency -->
<groupId>
org.apache.spark</groupId>
<artifactId>
spark-sql_2.11</artifactId>
<version>
2.2.0</version>
</dependency>
<dependency>
<!-- Spark dependency -->
<groupId>
org.apache.spark</groupId>
<artifactId>
spark-hive_2.11</artifactId>
<version>
2.2.0</version>
</dependency>
While managing these dependencies by hand isn’t particularly challenging, sometimes mistakes can be made when updating versions. The sbt-spark-package plug-in can simplify managing Spark dependencies. This plug-in is normally used for creating community packages (discussed in “Creating a Spark Package”), but also assist in building software that depends on Spark. To add the plug-in to your sbt build you need to create a project/plugins.sbt file and make sure it contains the code in Example 3-7.
resolvers
+=
[
"
Spark
Package
Main
Repo
"
at
"
https://dl.bintray.com/spark-packages/maven
"
]
addSbtPlugin
(
"org.spark-packages"
%
"sbt-spark-package"
%
"0.2.5"
)
If you are starting a new Spark project, there is a g8 template, holdenk/sparkProjectTemplate you can use to bootstrap your new project by running sbt new holdenk/sparkProjectTemplate
.
For spark-packages
to work you will need to specify a Spark version and at least one Spark component (core), which can be done in sbt settings as shown in Example 3-8.
sparkVersion
:=
"2.2.0"
sparkComponents
++=
Seq
(
"core"
)
Once you have sbt-spark-package
installed and set up, you can add the Spark components by just adding SQL
and Hive
to your list of sparkComponents
as shown in Example 3-9.
sparkComponents
++=
Seq
(
"sql"
,
"hive"
,
"hive-thriftserver"
,
"hive-thriftserver"
)
While it’s not required, if you do have an existing Hive Metastore to which you wish to connect with Spark, you can copy your hive-site.xml to Spark’s conf/ directory.
The default Hive Metastore version is 1.2.1. For other versions of the Hive Metastore you will need to set the spark.sql.hive.metastore.version
property to the desired versions as well as set spark.sql.hive.metastore.jars
to either “maven” (to have Spark retrieve the JARs) or the system path where the Hive JARs are present.
If you can’t include the Hive dependencies with your application, you can leave out Spark’s Hive component and instead create a SQLContext
, as shown in Example 3-10.
This provides much of the same functionality, but uses a less capable SQL parser and lacks certain Hive-based user-defined functions (UDFs) and user-defined aggregate functions (UDAFs).
val
sqlContext
=
new
SQLContext
(
sc
)
// Import the implicits, unlike in core Spark the implicits are defined
// on the context.
import
sqlContext.implicits._
As with the core SparkContext
and StreamingContext
, the Hive
/SQLContext
is used to load your data.
JSON is a very popular format, in part because it can be easily loaded in many languages, and is at least semi–human-readable.
Some of the sample data we’ve included in the book is in JSON format for exactly these reasons.
JSON is especially interesting since it lacks schema information, and Spark needs to do some work to infer the schema from our data.
JSON can also be expensive to parse; in some simple cases parsing the input JSON data can be greater than the actual operation.
We will cover the full loading and saving API for JSON in “JSON”,
but to get started, let’s load a sample we can use to explore the schema (see Example 3-11).
val
df1
=
session
.
read
.
json
(
path
)
Feel free to load your own JSON data, but if you don’t have any handy to test with, check out the examples GitHub resources directory. Now that you’ve got the JSON data loaded you can start by exploring what schema Spark has managed to infer for your data.
The schema information, and the optimizations it enables, is one of the core differences between Spark SQL and core Spark.
Inspecting the schema is especially useful for DataFrames
since you don’t have the templated type you do with RDDs or Datasets
.
Schemas are normally handled automatically by Spark SQL, either inferred when loading the data or computed based on the parent DataFrames
and the transformation being applied.
DataFrames
expose the schema in both human-readable or programmatic formats.
printSchema()
will show us the schema of a DataFrame
and is most commonly used when working in the shell to figure out what you are working with.
This is especially useful for data formats, like JSON, where the schema may not be immediately visible by looking at only a few records or reading a header.
For programmatic usage, you can get the schema by simply calling schema
, which is often used in ML pipeline transformers.
Since you are likely familiar with case classes and JSON, let’s examine how the equivalent Spark SQL schema would be represented in Examples 3-12 and 3-13.
{
"name"
:
"mission"
,
"pandas"
:
[{
"id"
:
1
,
"zip"
:
"94110"
,
"pt"
:
"giant"
,
"happy"
:
true
,
"attributes"
:
[
0.4
,
0.5
]}]}
case
class
RawPanda
(
id
:
Long
,
zip
:
String
,
pt
:
String
,
happy
:
Boolean
,
attributes
:
Array
[
Double
])
case
class
PandaPlace
(
name
:
String
,
pandas
:
Array
[
RawPanda
])
Now with the case classes defined you can create a local instance, turn it into a Dataset
, and print the schema as shown in Example 3-14, resulting in Example 3-15.
The same can be done with the JSON data, but requires some configuration as discussed in “JSON”.
def
createAndPrintSchema
()
=
{
val
damao
=
RawPanda
(
1
,
"M1B 5K7"
,
"giant"
,
true
,
Array
(
0.1
,
0.1
))
val
pandaPlace
=
PandaPlace
(
"toronto"
,
Array
(
damao
))
val
df
=
session
.
createDataFrame
(
Seq
(
pandaPlace
))
df
.
printSchema
()
}
root |-- name: string (nullable = true) |-- pandas: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- id: long (nullable = false) | | |-- zip: string (nullable = true) | | |-- pt: string (nullable = true) | | |-- happy: boolean (nullable = false) | | |-- attributes: array (nullable = true) | | | |-- element: double (containsNull = false)
In addition to the human-readable schema, the schema information is also available for you to use programmatically.
The programatic schema is returned as a StructField
, as shown in Example 3-16.
case
class
StructField
(
name
:
String
,
dataType
:
DataType
,
nullable
:
Boolean
=
true
,
metadata
:
Metadata
=
Metadata
.
empty
)
....
Example 3-17 shows the same schema as Example 3-15, in machine-readable format.
org.apache.spark.sql.types.StructType = StructType( StructField(name,StringType,true), StructField(pandas, ArrayType( StructType(StructField(id,LongType,false), StructField(zip,StringType,true), StructField(pt,StringType,true), StructField(happy,BooleanType,false), StructField(attributes,ArrayType(DoubleType,false),true)), true),true))
From here you can dive into what this schema information means and look at how to construct more complex schemas.
The first part is a StructType
, which contains a list of fields.
It’s important to note you can nest StructTypes
, like how a case class can contain additional case classes.
The fields in the StructType
are defined with StructField
, which specifies the name, type (see Tables 3-1 and 3-2 for a listing of types), and a Boolean indicating if the field may be null/missing.
Scala type | SQL type | Details |
---|---|---|
|
|
1-byte signed integers (–128,127) |
|
|
2-byte signed integers (–32768,32767) |
|
|
4-byte signed integers (–2147483648,2147483647) |
|
|
8-byte signed integers (–9223372036854775808, |
|
|
Arbitrary precision signed decimals |
|
|
4-byte floating-point number |
|
|
8-byte floating-point number |
|
|
Array of bytes |
|
|
true/false |
|
|
Date without time information |
|
|
Date with time information (second precision) |
|
|
Character string values (stored as UTF8) |
Scala type | SQL type | Details | Example |
---|---|---|---|
|
|
Array of single type of element, |
|
|
|
Key/value map, |
|
case class |
|
Named fields of possible heterogeneous types, similar to a case class or JavaBean. |
|
As you saw in Example 3-17, you can nest StructField
s and all of the complex Spark SQL types.
Now that you’ve got an idea of how to understand and, if needed, specify schemas for your data, you are ready to start exploring the DataFrame
interfaces.
Spark SQL schemas are eagerly evaluated, unlike the data underneath. If you find yourself in the shell and uncertain of what a transformation will do, try it and print the schema. See Example 3-15.
Spark SQL’s DataFrame API allows us to work with DataFrames
without having to register temporary tables or generate SQL expressions. The DataFrame API has both transformations and actions. The transformations on DataFrames
are more relational in nature, with the Dataset API (covered next) offering a more functional-style API.
Transformations on DataFrames
are similar in concept to RDD transformations, but with a more relational flavor.
Instead of specifying arbitrary functions, which the optimizer is unable to introspect, you use a restricted expression syntax so the optimizer can have more information.
As with RDDs, we can broadly break down transformations into simple single DataFrame
, multiple DataFrame
, key/value, and grouped/windowed transformations.
Spark SQL transformations are only partially lazy; the schema is eagerly evaluated.
Simple DataFrame
transformations allow us to do most of the standard things one can do when working a row at a time.2
You can still do many of the same operations defined on RDDs, except using Spark SQL expressions instead of arbitrary functions.
To illustrate this we will start by examining the different kinds of filter operations available on DataFrames
.
DataFrame
functions, like filter
, accept Spark SQL expressions instead of lambdas.
These expressions allow the optimizer to understand what the condition represents, and with filter
, it can often be used to skip reading unnecessary records.
To get started, let’s look at a SQL expression to filter our data for unhappy pandas using our existing schema.
The first step is looking up the column that contains this information.
In our case it is happy
, and for our DataFrame
(called df
) we access the column through the apply
function (e.g., df("happy")
).
The filter
expression requires the expression to return a boolean value, and if you wanted to select happy pandas, the entire expression could be retrieving the column value.
However, since we want to find the unhappy pandas, we can check to see that happy
isn’t true using the !==
operator as shown in Example 3-18.
pandaInfo
.
filter
(
pandaInfo
(
"happy"
)
!==
true
)
To look up the column, we can either provide the column name on the specific DataFrame
or use the implicit $
operator for column lookup. This is especially useful when the DataFrame
is anonymous.
The !
binary negation function can be used together with $
to simplify our expression from Example 3-18 down to df.filter(!$("happy"))
.
This illustrates how to access a specific column from a DataFrame
.
For accessing other structures inside of DataFrames
, like nested structs, keyed maps, and array elements, use the same apply syntax.
So, if the first element in the attributes
array represent squishiness, and you only want very squishy pandas, you can access that element by writing df("attributes")(0) >= 0.5
.
Our expressions need not be limited to a single column. You can compare multiple columns in our “filter” expression. Complex filters like that shown in Example 3-19 are more difficult to push down to the storage layer, so you may not see the same speedup over RDDs that you see with simpler filters.
pandaInfo
.
filter
(
pandaInfo
(
"happy"
).
and
(
pandaInfo
(
"attributes"
)(
0
)
>
pandaInfo
(
"attributes"
)(
1
))
)
Spark SQL’s column operators are defined on the column class, so a filter containing the expression 0 >= df.col("friends")
will not compile since Scala will use the >=
defined on 0. Instead you would write df.col("friend") <= 0
or convert 0 to a column literal with lit
.3
Spark SQL’s DataFrame API has a very large set of operators available.
You can use all of the standard mathematical operators on floating points, along with the standard logical and bitwise operations (prefix with bitwise
to distinguish from logical).
Columns use ===
and !==
for equality to avoid conflict with Scala internals.
For columns of strings, startsWith
/endsWith
, substr
, like
, and isNull
are all available.
The full set of operations is listed in org.apache.spark.sql.Column and covered in Table 3-3 and Table 3-4.
Scala operator | Java equivalent | Input column types | Output type | Purpose | Sample | Result |
---|---|---|---|---|---|---|
|
|
Any |
Boolean |
Check if expressions not equal |
|
|
|
|
Numeric |
Numeric |
Modulo |
|
0 |
|
|
Boolean |
Boolean |
Boolean and |
|
|
|
|
Numeric |
Numeric |
Multiply expressions |
|
|
|
|
Numeric |
Numeric |
Sum expression |
|
|
|
|
Numeric |
Numeric |
Subtraction |
|
|
|
|
Numeric |
Numeric |
Unary subtraction |
|
|
|
|
Numeric |
Double |
Division |
|
|
|
|
Comparable |
Boolean |
Less than |
|
|
|
|
Comparable |
Boolean |
Less than or equal to |
|
|
|
|
Any |
Any |
Equality test (unsafe on null values) |
|
|
|
|
Any |
Any |
Equality test (safe on null values) |
|
|
|
|
Comparable |
Boolean |
Greater than |
|
|
|
|
Comparable |
Boolean |
Greater than or equal to |
|
|
Operator | Input column types | Output type | Purpose | Sample | Result |
---|---|---|---|---|---|
|
Complex types |
Type of field accessed |
Get value from complex type (e.g., structfield/map |
|
|
|
Integral Type a |
Same as input |
Computes and bitwise |
|
|
|
Integral Type a |
Same as input |
Computes or bitwise |
|
|
|
Integral Type a |
Same as input |
Computes bitwise exclusive or |
|
|
a Integral types include |
Not all Spark SQL expressions can be used in every API call. For example, Spark SQL joins do not support complex operations, and filter
requires that the expression result in a boolean, and similar.
In addition to the operators directly specified on the column, an even larger set of functions on columns exists in org.apache.spark.sql.functions, some of which we cover in Tables 3-5, 3-6, and 3-7. For illustration, this example shows the values for each column at a specific row, but keep in mind that these functions are called on columns, not values.
Function name | Purpose | Input types | Example usage | Result |
---|---|---|---|---|
|
Convert a Scala symbol to a column literal |
Column & Symbol |
|
|
|
Create a new array column |
Must all have the same Spark SQL type |
|
|
|
Check if not a number |
Numeric |
|
false |
|
Opposite value |
Boolean |
|
false |
Function name | Purpose | Input types | Example usage | Result |
---|---|---|---|---|
|
Absolute value |
Numeric |
|
|
|
Square root |
Numeric |
|
|
|
Inverse cosine |
Numeric |
|
1.04…. a |
|
Inverse sine |
Numeric |
|
0.523… a |
|
Inverse tangent |
Numeric |
|
0.46… a |
|
Cube root |
Numeric |
|
|
|
Ceiling |
Numeric |
|
|
|
Cosine |
Numeric |
|
0.877…. a |
|
Sine |
Numeric |
|
0.479… a |
|
Tangent |
Numeric |
|
0.546… a |
|
Exponent |
Numeric |
|
2.718… a |
|
Ceiling |
Numeric |
|
|
|
Minimum value |
Numerics |
|
-10 |
a Truncated for display purposes. |
Beyond simply filtering out data, you can also produce a DataFrame
with new columns or updated values in old columns.
Spark uses the same expression syntax we discussed for filter
, except instead of having to include a condition (like testing for equality), the results are used as values in the new DataFrame
.
To see how you can use select
on complex and regular data types, Example 3-20 uses the Spark SQL explode
function to turn an input DataFrame
of PandaPlaces
into a DataFrame
of just PandaInfo
as well as computing the “squishness” to “hardness” ratio of each panda.
val
pandaInfo
=
pandaPlace
.
explode
(
pandaPlace
(
"pandas"
)){
case
Row
(
pandas
:
Seq
[
Row
])
=>
pandas
.
map
{
case
(
Row
(
id
:
Long
,
zip
:
String
,
pt
:
String
,
happy
:
Boolean
,
attrs
:
Seq
[
Double
]))
=>
RawPanda
(
id
,
zip
,
pt
,
happy
,
attrs
.
toArray
)
}}
pandaInfo
.
select
(
(
pandaInfo
(
"attributes"
)(
0
)
/
pandaInfo
(
"attributes"
)(
1
))
.
as
(
"squishyness"
))
When you construct a sequence of operations, the generated column names can quickly become unwieldy, so the as
or alias
operators are useful to specify the resulting column name.
While all of these operations are quite powerful, sometimes the logic you wish to express is more easily encoded with if
/else
semantics.
Example 3-21 is a simple example of this, and it encodes the different types of panda as a numeric value.4
The when
and otherwise
functions can be chained together to create the same effect.
/**
* Encodes pandaType to Integer values instead of String values.
*
* @param pandaInfo the input DataFrame
* @return Returns a DataFrame of pandaId and integer value for pandaType.
*/
def
encodePandaType
(
pandaInfo
:
DataFrame
)
:
DataFrame
=
{
pandaInfo
.
select
(
pandaInfo
(
"id"
),
(
when
(
pandaInfo
(
"pt"
)
===
"giant"
,
0
).
when
(
pandaInfo
(
"pt"
)
===
"red"
,
1
).
otherwise
(
2
)).
as
(
"encodedType"
)
)
}
Spark SQL also provides special tools for handling missing, null, and invalid data.
By using isNan
or isNull
along with filters, you can create conditions for the rows you want to keep. For example, if you have a number of different columns, perhaps with different levels of precision (some of which may be null), you can use coalesce(c1, c2, ..)
to return the first nonnull column. Similarly, for numeric data, nanvl
returns the first non-NaN value (e.g., nanvl(0/0, sqrt(-2), 3)
results in 3). To simplify working with missing data, the na
function on DataFrame
gives us access to some common routines for handling missing data in DataFrameNaFunctions.
Sometimes applying a row-by-row decision, as you can with filter
, isn’t enough.
Spark SQL also allows us to select the unique rows by calling dropDuplicates
, but as with the similar operation on RDDs (distinct
), this can require
a shuffle, so is often much slower than filter
. Unlike with RDDs, dropDuplicates
can optionally drop rows based on only a subset of the columns, such as an ID field, as shown in Example 3-22.
pandas
.
dropDuplicates
(
List
(
"id"
))
This leads nicely into our next section on aggregates and groupBy
since often the most expensive component of each is the shuffle.
Spark SQL has many powerful aggregates, and thanks to its optimizer it can be easy to combine many aggregates into one single action/query.
Like with Pandas’ DataFrames
, groupBy
returns special objects on which we can ask for certain aggregations to be performed.
In pre-2.0 versions of Spark, this was a generic GroupedData
, but in versions 2.0 and beyond, DataFrames
groupBy
is the same as one Datasets
.
Aggregations on Datasets
have extra functionality, returning a GroupedDataset
(in pre-2.0 versions of Spark) or
a KeyValueGroupedDataset
when grouped with an arbitrary function, and
a RelationalGroupedDataset
when grouped with a relational/Dataset DSl expression.
The additional typed functionality is discussed in “Grouped Operations on Datasets”, and
the common “untyped” DataFrame
and Dataset
groupBy
functionality is explored here.
min
, max
, avg
, and sum
are all implemented as convenience functions directly on GroupedData
, and more can be specified by providing the expressions to agg
.
Example 3-23 shows how to compute the maximum panda size by zip code.
Once you specify the aggregates you want to compute, you can get the results back as a DataFrame
.
If you’re used to RDDs you might be concerned by groupBy
, but it is now a safe operation on DataFrames
thanks to the Spark SQL optimizer, which automatically pipelines our reductions, avoiding giant shuffles and mega records.
def
maxPandaSizePerZip
(
pandas
:
DataFrame
)
:
DataFrame
=
{
pandas
.
groupBy
(
pandas
(
"zip"
)).
max
(
"pandaSize"
)
}
While Example 3-23 computes the max on a per-key basis, these aggregates can also be applied over the entire DataFrame
or all numeric columns in a DataFrame
.
This is often useful when trying to collect some summary statistics for the data with which you are working.
In fact, there is a built-in describe
transformation which does just that, although it can also be limited to certain columns, which is used in Example 3-24 and returns Example 3-25.
// Compute the count, mean, stddev, min, max summary stats for all
// of the numeric fields of the provided panda infos. non-numeric
// fields (such as string (name) or array types) are skipped.
val
df
=
pandas
.
describe
()
// Collect the summary back locally
println
(
df
.
collect
())
Array([count,3,3], [mean,1.3333333333333333,5.0], [stddev,0.5773502691896258,4.358898943540674], [min,1,2], [max,2,10])
The behavior of groupBy
has changed between Spark versions. Prior to Spark 1.3 the values of the grouping columns are discarded by default, while post 1.3 they are retained. The configuration parameter, spark.sql.retainGroupColumns
, can be set to false to force the earlier functionality.
For computing multiple different aggregations, or more complex aggregations, you should use the agg
API on the GroupedData
instead of directly calling count
, mean
, or similar convenience functions.
For the agg
API, you either supply a list of aggregate expressions, a string representing the aggregates, or a map of column names to aggregate function names. Once we’ve called agg
with the requested aggregates, we get back a regular DataFrame
with the aggregated results.
As with regular functions, they are listed in the org.apache.spark.sql.functions Scaladoc. Table 3-8 lists some common and useful aggregates. For our example results in these tables we will consider a DataFrame
with the schema of name field (as a string) and age (as an integer), both nullable with values ({"ikea", null}, {"tube", 6}, {"real", 30}
).
Example 3-26 shows how to compute both the min and mean for the pandaSize
column on our running panda example.
Computing multiple aggregates with Spark SQL can be much simpler than doing the same tasks with the RDD API.
def
minMeanSizePerZip
(
pandas
:
DataFrame
)
:
DataFrame
=
{
// Compute the min and mean
pandas
.
groupBy
(
pandas
(
"zip"
)).
agg
(
min
(
pandas
(
"pandaSize"
)),
mean
(
pandas
(
"pandaSize"
)))
}
Function name | Purpose | Storage requirement | Input types | Example usage | Example result |
---|---|---|---|---|---|
|
Count approximate distinct values in column a |
Configurable through rsd (which controls error rate) |
All |
|
2 |
|
Average |
Constant |
Numeric |
|
|
|
Count number of items (excluding nulls). Special case of “*” counts number of rows |
Constant |
All |
|
|
|
Count distinct values in column |
O(distinct elems) |
All |
|
2 |
|
Return the first element b |
Constant |
All |
|
6 |
|
Return the last element |
Constant |
All |
|
30 |
|
Sample standard deviation c |
Constant |
Numeric |
|
16.97… |
|
Population standard deviation c |
Constant |
Numeric |
|
12.0 |
|
Sum of the values |
Constant |
Numeric |
|
36 |
|
Sum of the distinct values |
O(distinct elems) |
Numeric |
|
36 |
|
Select the minimum value |
Constant |
Sortable data |
|
5 |
|
Select the maximum value |
Constant |
Sortable data |
|
30 |
|
Select the mean value |
Constant |
Numeric |
|
18 |
a Implemented with HyperLogLog: https://en.wikipedia.org/wiki/HyperLogLog. b This was commonly used in early versions of Spark SQL where the grouping column was not preserved. c Added in Spark 1.6. |
In addition to using aggregates on groupBy
, you can run the same aggregations on multidimensional cubes with cube
and rollups with rollup
.
If the built-in aggregation functions don’t meet your needs, you can extend Spark SQL using UDFs as discussed in “Extending with User-Defined Functions and Aggregate Functions (UDFs, UDAFs)”, although things can be more complicated for aggregate functions.
Spark SQL 1.4.0 introduced windowing functions to allow us to more easily work with ranges or windows of rows. When creating a window you specify what columns the window is over, the order of the rows within each partition/group, and the size of the window (e.g., K rows before and J rows after OR range between values). If it helps to think of this visually, Figure 3-2 shows a sample window and its results. Using this specification each input row is related to some set of rows, called a frame, that is used to compute the resulting aggregate. Window functions can be very useful for things like computing average speed with noisy data, relative sales, and more. A window for pandas by age is shown in Example 3-27.
val
windowSpec
=
Window
.
orderBy
(
pandas
(
"age"
))
.
partitionBy
(
pandas
(
"zip"
))
.
rowsBetween
(
start
=
-
10
,
end
=
10
)
// can use rangeBetween for range instead
Once you’ve defined a window specification you can compute a function over it, as shown in Example 3-28. Spark’s existing aggregate functions, covered in “Aggregates and groupBy”, can be computed on an aggregation over the window. Window operations are very useful for things like Kalman filtering or many types of relative analysis.
val
pandaRelativeSizeCol
=
pandas
(
"pandaSize"
)
-
avg
(
pandas
(
"pandaSize"
)).
over
(
windowSpec
)
pandas
.
select
(
pandas
(
"name"
),
pandas
(
"zip"
),
pandas
(
"pandaSize"
),
pandas
(
"age"
),
pandaRelativeSizeCol
.
as
(
"panda_relative_size"
))
As of this writing, windowing functions require Hive support to be enabled or using HiveContext
.
Sorting supports multiple columns in ascending or descending order, with ascending as the default. These sort orders can be intermixed, as shown in Example 3-29. Spark SQL has some extra benefits for sorting as some serialized data can be compared without deserialization.
pandas
.
orderBy
(
pandas
(
"pandaSize"
).
asc
,
pandas
(
"age"
).
desc
)
When limiting results, sorting is often used to only bring back the top or bottom K results.
When limiting you specify the number of rows with limit(numRows)
to restrict the number of rows in the DataFrame
.
Limits are also sometimes used for debugging without sorting to bring back a small result.
If, instead of limiting the number of rows based on a sort order, you want to sample your data, “Sampling” covers techniques for Spark SQL sampling as well.
Beyond single DataFrame
transformations you can perform operations that depend on multiple DataFrames
.
The ones that first pop into our heads are most likely the different types of joins,
which are covered in Chapter 4, but beyond that you can also perform a number of set-like operations between DataFrames
.
The DataFrame
set-like operations allow us to perform many operations that are most commonly thought of as set operations.
These operations behave a bit differently than traditional set operations since we don’t have the restriction of unique elements.
While you are likely already familiar with the results of set-like operations from regular Spark and Learning Spark, it’s important to review the
cost of these operations in Table 3-9.
Operation name | Cost |
---|---|
|
Low |
|
Expensive |
|
Expensive |
|
Expensive |
Sometimes, it’s better to use regular SQL queries instead of building up our operations on DataFrames
.
If you are connected to a Hive Metastore we can directly write SQL queries against the Hive tables and get the results as a DataFrame
.
If you have a DataFrame
you want to write SQL queries against, you can register it as a temporary table, as shown in Example 3-30 (or save it as a managed table if you intend to reuse it between jobs).
Datasets
can also be converted back to DataFrames
and registered for querying against.
def
registerTable
(
df
:
DataFrame
)
:
Unit
=
{
df
.
registerTempTable
(
"pandas"
)
df
.
write
.
saveAsTable
(
"perm_pandas"
)
}
Querying tables is the same, regardless of whether it is a temporary table, existing Hive table, or newly saved Spark table, and is illustrated in Example 3-31.
def
querySQL
()
:
DataFrame
=
{
sqlContext
.
sql
(
"SELECT * FROM pandas WHERE size > 0"
)
}
In addition to registering tables you can also write queries directly against a specific file path, as shown in Example 3-32.
def
queryRawFile
()
:
DataFrame
=
{
sqlContext
.
sql
(
"SELECT * FROM parquet.`path_to_parquet_file`"
)
}
DataFrames
are more than RDDs of Row
objects; DataFrames
and Datasets
have a specialized representation and columnar cache format.
The specialized representation is not only more space efficient, but also can be much faster to encode than even Kryo serialization.
To be clear, like RDDs, DataFrames
and Datasets
are generally lazily evaluated and build up a lineage of their dependencies (except in DataFrames
this is called a logical plan and contains more information).
Tungsten is a new Spark SQL component that provides more efficient Spark operations by working directly at the byte level.
Looking back on Figure 3-1, we can take a closer look at the space differences between the RDDs and DataFrames
when cached in Figure 3-3.
Tungsten includes specialized in-memory data structures tuned for the types of operations required by Spark,
improved code generation, and a specialized wire protocol.
For those coming from Hadoop, you can think of Tungsten data types as being WritableComparable
types on steroids.
Tungsten’s representation is substantially smaller than objects serialized using Java or even Kryo serializers. As Tungsten does not depend on Java objects, both on-heap and off-heap allocations are supported. Not only is the format more compact, but serialization times can be substantially faster than with native serialization.
Since Tungsten no longer depends on working with Java objects, you can use either on-heap (in the JVM) or off-heap storage. If you use off-heap storage, it is important to leave enough room in your containers for the off-heap allocations, which you can get an approximate idea for from the web UI.
Tungsten’s data structures are also created closely in mind with the kind of processing for which they are used. The classic example of this is with sorting, a common and expensive operation. The on-wire representation is implemented so that sorting can be done without having to deserialize the data again.
In the future Tungsten may make it more feasible to use certain non-JVM libraries. For many simple operations the cost of using BLAS, or similar linear algebra packages, from the JVM is dominated by the cost of copying the data off-heap.
By avoiding the memory and GC overhead of regular Java objects, Tungsten is able to process larger datasets than the same handwritten aggregations.
Tungsten became the default in Spark 1.5 and can be enabled in earlier versions by setting spark.sql.tungsten.enabled
to true (or disabled in later versions by setting this to false).
Even without Tungsten, Spark SQL uses a columnar storage format with Kryo serialization to minimize storage cost.
Spark SQL has a different way of loading and saving data than core Spark. To be able to push down certaintypes of operations to the storage layer, Spark SQL has its own Data Source API. Data sources are able to specify and control which type of operations should be pushed down to the data source. As developers, you don’t need to worry too much about the internal activity going on here, unless the data sources you are looking for are not supported.
Data loading in Spark SQL is not quite as lazy as in regular Spark, but is still generally lazy. You can verify this by quickly trying to load from a data source that doesn’t exist.
The DataFrameWriter and the DataFrameReader cover writing and reading from external data sources.
The DataFrameWriter
is accessed by calling write
on a DataFrame
or Dataset
.
The DataFrameReader
can be accessed through read
on a SQLContext
.
Spark SQL updated the load/save API in Spark 1.4, so you may see code still using the old-style API without the DataFrame
reader or writer classes, but under the hood it is implemented as a wrapper around the new API.
When reading or writing you specify the format by calling format(formatName)
on the DataFrameWriter
/DataFrameReader
.
Format-specific parameters, such as number of records to be sampled for JSON, are specified by either providing a map of options with options
or
setting option-by-option with option
on the reader/writer.
The first-party formats JSON
, JDBC
, ORC
, and Parquet
methods are directly defined on the reader/writers taking the path or connection info. These methods are for convenience only and are wrappers around the more general methods we illustrate in this chapter.
Loading and writing JSON is supported directly in Spark SQL, and despite the lack of schema information in JSON, Spark SQL is able to infer a schema for us by sampling the records.
Loading JSON data is more expensive than loading many data sources, since Spark needs to read some of the records to determine the schema information.
If the schema between records varies widely (or the number of records is very small), you can increase the percentage of records read to determine the schema by setting samplingRatio
to a higher value, as in Example 3-33 where we set the sample ratio to 100%
.
val
df2
=
session
.
read
.
format
(
"json"
)
.
option
(
"samplingRatio"
,
"1.0"
).
load
(
path
)
Spark’s schema inference can be a compelling reason to use Spark for processing JSON data, even if the data size could be handled on a single node.
Since our input may contain some invalid JSON records we may wish to filter out, we can also take in an RDD of strings. This allows us to load the input as a standard text file, filter out our invalid records, and then load the data into JSON. This is done by using the built-in json
function on the DataFrameReader
, which takes RDDs or paths and is shown in Example 3-34.
Methods for converting RDDs of regular objects are covered in “RDDs”.
val
rdd
:
RDD
[
String
]
=
input
.
filter
(
_
.
contains
(
"panda"
))
val
df
=
session
.
read
.
json
(
rdd
)
The JDBC data source represents a natural Spark SQL data source, one that supports many of the same operations.
Since different database vendors have slightly different JDBC implementations, you need to add the JAR for your JDBC data sources.
Since SQL field types vary as well, Spark uses JdbcDialects
with built-in dialects for DB2, Derby, MsSQL, MySQL, Oracle, and Postgres.5
While Spark supports many different JDBC sources, it does not ship with the JARs required to talk to all of these databases.
If you are submitting your Spark job with spark-submit
you can download the required JARs to the host you are launching and include them by specifying --jars
or supply the Maven coordinates to --packages
.
Since the Spark Shell is also launched this way, the same syntax works and you can use it to include the MySQL JDBC JAR in Example 3-35.
spark-submit --jars ./resources/mysql-connector-java-5.1.38.jar$ASSEMBLY_JAR
$CLASS
In earlier versions of Spark --jars
does not include the JAR in the driver’s class path. If this is the case for your cluster you must also specify the same JAR to --driver-class-path
.
JdbcDialects allow Spark to correctly map the JDBC types to the corresponding Spark SQL types.
If there isn’t a JdbcDialect
for your database vendor, the default dialect will be used, which will likely work for many of the types.
The dialect is automatically chosen based on the JDBC URL used.
If you find yourself needing to customize the JdbcDialect
for your database vendor, you can look for a package or spark-packages
or extend the JdbcDialect
class and register your own dialect.
As with the other built-in data sources, there exists a convenience wrapper for specifying the properties required to load JDBC data, illustrated in Example 3-36.
The convenience wrapper JDBC
accepts the URL, table, and a java.util.Properties
object for connection properties (such as authentication information).
The properties object is merged with the properties that are set on the reader/writer itself. While the properties object is required, an empty properties object can be provided and properties instead specified on the reader/writer.
session
.
read
.
jdbc
(
"jdbc:dialect:serverName;user=user;password=pass"
,
"table"
,
new
Properties
)
session
.
read
.
format
(
"jdbc"
)
.
option
(
"url"
,
"jdbc:dialect:serverName"
)
.
option
(
"dbtable"
,
"table"
).
load
()
The API for saving a DataFrame
is very similar to the API used for loading. The save()
function needs no path since the information is already specified, as illustrated in Example 3-37, just as with loading.
df
.
write
.
jdbc
(
"jdbc:dialect:serverName;user=user;password=pass"
,
"table"
,
new
Properties
)
df
.
write
.
format
(
"jdbc"
)
.
option
(
"url"
,
"jdbc:dialect:serverName"
)
.
option
(
"user"
,
"user"
)
.
option
(
"password"
,
"pass"
)
.
option
(
"dbtable"
,
"table"
).
save
()
In addition to reading and writing JDBC data sources, Spark SQL can also run its own JDBC server (covered in “JDBC/ODBC Server”).
Apache Parquet files are a common format directly supported in Spark SQL, and they are incredibly space-efficient and popular.
Apache Parquet’s popularity comes from a number of features, including the ability to easily split across multiple files, compression, nested types, and many others discussed in the Parquet documentation.
Since Parquet is such a popular format, there are some additional options available in Spark for the reading and writing of Parquet files.
These options are listed in Table 3-10.
Unlike third-party data sources, these options are mostly configured on the SQLContext
, although some can be configured on either the SQLContext
or DataFrameReader/Writer
.
SQLConf | DataFrameReader/Writer option | Default | Purpose |
---|---|---|---|
|
|
False |
Control if schema should be merged between partitions when reading. Can be expensive, so disabled by default in 1.5.0. |
|
N/A |
False |
Treat binary data as strings. Old versions of Spark wrote strings as binary data. |
|
N/A |
True |
Cache Parquet metadata, normally safe unless underlying data is being modified by another process. |
|
N/A |
Gzip |
Specify the compression codec for use with Parquet data. Valid options are uncompressed, snappy, gzip, or lzo. |
|
N/A |
True |
Push down filters to Parquet (when possible).a |
|
N/A |
False |
Write in Parquet metadata in the legacy format. |
|
N/A |
|
Output committer used by Parquet. If writing to S3 you may wish to try |
a Pushdown means evaluate at the storage, so with Parquet this can often mean skipping reading unnecessary rows or files. |
Reading Parquet from an old version of Spark requires some special options, as shown in Example 3-38.
def
loadParquet
(
path
:
String
)
:
DataFrame
=
{
// Configure Spark to read binary data as string,
// note: must be configured on session.
session
.
conf
.
set
(
"spark.sql.parquet.binaryAsString"
,
"true"
)
// Load parquet data using merge schema (configured through option)
session
.
read
.
option
(
"mergeSchema"
,
"true"
)
.
format
(
"parquet"
)
.
load
(
path
)
}
Writing parquet with the default options is quite simple, as shown in Example 3-39.
def
writeParquet
(
df
:
DataFrame
,
path
:
String
)
=
{
df
.
write
.
format
(
"parquet"
).
save
(
path
)
}
Interacting with Hive tables adds another option beyond the other formats.
As covered in “Plain Old SQL Queries and Interacting with Hive Data”, one option for bringing in data from a Hive table is writing a SQL query against it and having the result as a DataFrame
.
The DataFrame
’s reader and writer interfaces can also be used with Hive tables, as with the rest of the data sources, as illustrated in Example 3-40.
def
loadHiveTable
()
:
DataFrame
=
{
session
.
read
.
table
(
"pandas"
)
}
When loading a Hive table Spark SQL will convert the metadata and cache the result. If the underlying metadata has changed you can use sqlContext.refreshTable("tablename")
to update the metadata, or the caching can be disabled by setting spark.sql.parquet.cacheMetadata
to false
.
Saving a managed table is a bit different, and is illustrated in Example 3-41.
def
saveManagedTable
(
df
:
DataFrame
)
:
Unit
=
{
df
.
write
.
saveAsTable
(
"pandas"
)
}
Unless specific conditions are met, the result saved to a Hive managed table will be saved in a Spark-specific format that other tools may not be able to understand.
Spark SQL DataFrames
can easily be converted to RDDs of Row
objects, and can also be created from RDDs of Row
objects as well as JavaBeans, Scala case classes, and tuples.
For RDDs of strings in JSON format, you can use the methods discussed in “JSON”.
Datasets
of type T can also easily be converted to RDDs of type T, which can provide a useful bridge for DataFrames
to RDDs of concrete case classes instead of Row
objects.
RDDs are a special-case data source, since when going to/from RDDs, the data remains inside of Spark without writing out to or reading from an external system.
Converting a DataFrame
to an RDD is a transformation (not an action); however, converting an RDD to a DataFrame
or Dataset
may involve computing (or sampling some of) the input RDD.
Creating a DataFrame
from an RDD is not free in the general case. The data must be converted into Spark SQL’s internal format.
When you create a DataFrame
from an RDD, Spark SQL needs to add schema information.
If you are creating the DataFrame
from an RDD of case classes or plain old Java objects (POJOs), Spark SQL is able to use reflection to automatically determine the schema, as shown in Example 3-42.
You can also manually specify the schema for your data using the structure discussed in “Basics of Schemas”.
This can be especially useful if some of your fields are not nullable.
You must specify the schema yourself if Spark SQL is unable to determine the schema through reflection, such as an RDD of Row
objects (perhaps from calling .rdd
on a DataFrame
to use a functional transformation, as shown in Example 3-42).
def
createFromCaseClassRDD
(
input
:
RDD
[
PandaPlace
])
=
{
// Create DataFrame explicitly using session and schema inference
val
df1
=
session
.
createDataFrame
(
input
)
// Create DataFrame using session implicits and schema inference
val
df2
=
input
.
toDF
()
// Create a Row RDD from our RDD of case classes
val
rowRDD
=
input
.
map
(
pm
=>
Row
(
pm
.
name
,
pm
.
pandas
.
map
(
pi
=>
Row
(
pi
.
id
,
pi
.
zip
,
pi
.
happy
,
pi
.
attributes
))))
val
pandasType
=
ArrayType
(
StructType
(
List
(
StructField
(
"id"
,
LongType
,
true
),
StructField
(
"zip"
,
StringType
,
true
),
StructField
(
"happy"
,
BooleanType
,
true
),
StructField
(
"attributes"
,
ArrayType
(
FloatType
),
true
))))
// Create DataFrame explicitly with specified schema
val
schema
=
StructType
(
List
(
StructField
(
"name"
,
StringType
,
true
),
StructField
(
"pandas"
,
pandasType
)))
val
df3
=
session
.
createDataFrame
(
rowRDD
,
schema
)
}
Case classes or JavaBeans defined inside another class can sometimes cause problems. If your RDD conversion is failing, make sure the case class being used isn’t defined inside another class.
Converting a DataFrame
to an RDD is incredibly simple; however, you get an RDD of Row
objects, as shown in Example 3-43.
Since a row can contain anything, you need to specify the type (or cast the result) as you fetch the values for each column in the row.
With Datasets
you can directly get back an RDD templated on the same type, which can make the conversion back to a useful RDD much simpler.
While Scala has many implicit conversions for different numeric types, these do not generally apply in Spark SQL; instead, we use explicit casting.
def
toRDD
(
input
:
DataFrame
)
:
RDD
[
RawPanda
]
=
{
val
rdd
:
RDD
[
Row
]
=
input
.
rdd
rdd
.
map
(
row
=>
RawPanda
(
row
.
getAs
[
Long
](
0
),
row
.
getAs
[
String
](
1
),
row
.
getAs
[
String
](
2
),
row
.
getAs
[
Boolean
](
3
),
row
.
getAs
[
Array
[
Double
]](
4
)))
}
If you know that the schema of your DataFrame
matches that of another, you can use the existing schema when constructing your new DataFrame
. One common place where this occurs is when an input DataFrame
has been converted to an RDD for functional filtering and then back.
Much like with RDDs, you can also create DataFrames
from local collections and bring them back as local collections, as illustrated in Example 3-44.
The same memory requirements apply; namely, the entire contents of the DataFrame
will be in-memory in the driver program.
As such, distributing local collections is normally limited to unit tests, or joining small datasets with larger distributed datasets.
def
createFromLocal
(
input
:
Seq
[
PandaPlace
])
=
{
session
.
createDataFrame
(
input
)
}
The LocalRelation
’s API we used here allows us to specify a schema in the same manner as when we are converting an RDD to a DataFrame
.
In pre-1.6 versions of PySpark, schema inference only looked at the first record.
Collecting data back as a local collection is more common and often done post aggregations or filtering on the data.
For example, with ML pipelines collecting the coefficents or (as discussed in our Goldilocks example in Chapter 6) collecting the quantiles to the driver.
Example 3-45 shows how to collect a DataFrame
back locally.
For larger datasets, saving to an external storage system (such as a database or HDFS) is recommended.
Just as with RDDs, do not collect large DataFrames
back to the driver. For Python users, it is important to remember that toPandas()
collects the data locally.
def
collectDF
(
df
:
DataFrame
)
=
{
val
result
:
Array
[
Row
]
=
df
.
collect
()
result
}
As with core Spark, the data formats that ship directly with Spark only begin to scratch the surface of the types of systems with which you can interact. Some vendors publish their own implementations, and many are published on Spark Packages. As of this writing there are over twenty formats listed on the Data Source’s page with the most popular being Avro, Redshift, CSV,6 and a unified wrapper around 6+ databases called deep-spark.
Spark packages can be included in your application in a few different ways.
During the exploration phase (e.g., using the shell) you can include them by specifying --packages
on the command line, as in Example 3-46.
The same approach can be used when submitting your application with spark-submit
, but this only includes the package at runtime, not at compile time.
For including at compile time you can add the Maven coordinates to your builds, or, if building with sbt, the sbt-spark-package plug-in
simplifies package dependencies with spDependencies
.
Otherwise, manually listing them as in Example 3-47 works quite well.
Spark CSV is now included as part of Spark 2.0+, so you only need to include this for earlier versions of Spark.
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
"com.databricks"
%
"spark-csv_2.11"
%
"1.5.0"
Once you’ve included the package with your Spark job you need to specify the format, as you did with the Spark provided ones.
The name should be mentioned in the package’s documentation. For spark-csv
you would specify a format string of com.databricks.spark.csv
.
For the built-in CSV
format (in Spark 2.0+) you would instead just use csv
(or the full name org.apache.spark.sql.csv
).
There are a few options if the data format you are looking for isn’t directly supported in either Spark or one of the libraries. Since many formats are available as Hadoop input formats, you can try to load your data as a Hadoop input format and convert the resulting RDD as discussed in “RDDs”. This approach is relatively simple, but means Spark SQL is unable to push down operations to our data store.7
For a deeper integration you can implement your data source using the Data Source API.
Depending on which operations you wish to support operator push-down for, in your base relation you will need to implement additional traits from the org.apache.spark.sql.sources
package.
The details of implementing a new Spark SQL data source are beyond the scope of this book, but if you are interested the Scaladoc for org.apache.spark.sql.sources
and spark-csv
’s CsvRelation can be good ways to get started.
In core Spark, saving RDDs always requires that the target directory does not exist, which can make appending to existing tables challenging.
With Spark SQL, you can specify the desired behavior when writing out to a path that may already have data.
The default behavior is SaveMode.ErrorIfExists
; matching the behavior of RDDs, Spark will throw an exception if the target already exists. The different save modes and their behaviors are listed in Table 3-11.
Example 3-48 illustrates how to configure an alternative save mode.
Save Mode | Behavior |
---|---|
|
Throws an exception if the target already exists. If target doesn’t exist write the data out. |
|
If target already exists, append the data to it. If the data doesn’t exist write the data out. |
|
If the target already exists, delete the target. Write the data out. |
|
If the target already exists, silently skip writing out. Otherwise write out the data. |
def
writeAppend
(
input
:
DataFrame
)
:
Unit
=
{
input
.
write
.
mode
(
SaveMode
.
Append
).
save
(
"output/"
)
}
Partition data is an important part of Spark SQL since it powers one of the key optimizations to allow reading only the required data, discussed more in “Logical and Physical Plans”. If you know how your downstream consumers may access your data (e.g., reading data based on zip code), when you write your data it is beneficial to use that information to partition your output. When reading the data, it’s useful to understand how partition discovery functions, so you can have a better understanding of whether your filter can be pushed down.
Filter push-down can make a huge difference when working with large datasets by allowing Spark to only access the subset of data required for your computation instead of doing effectively a full table scan.
When reading partitioned data, you point Spark to the root path of your data, and it will automatically discover the different partitions. Not all data types can be used as partition keys; currently only strings and numeric data are the supported types.
If your data is all in a single DataFrame
, the DataFrameWriter
API makes it easy to specify the partition information while you are writing the data out.
The partitionBy
function takes a list of columns to partition the output on, as shown in Example 3-49.
You can also manually save out separate DataFrames
(say if you are writing from different jobs) with individual save
calls.
def
writeOutByZip
(
input
:
DataFrame
)
:
Unit
=
{
input
.
write
.
partitionBy
(
"zipcode"
).
format
(
"json"
).
save
(
"output/"
)
}
In addition to splitting the data by a partition key, it can be useful to make sure the resulting file sizes are reasonable, especially if the results will be used downstream by another Spark job.
Datasets
are an exciting extension of Spark SQL that provide additional compile-time type checking.
Starting in Spark 2.0, DataFrames
are now a specialized version of Datasets
that operate on generic Row
objects and therefore lack the normal compile-time type checking of Datasets
.
Datasets
can be used when your data can be encoded for Spark SQL and you know the type information at compile time.
The Dataset API is a strongly typed collection with a mixture of relational (DataFrame
) and functional (RDD) transformations.
Like DataFrames
, Datasets
are represented by a logical plan the Catalyst optimizer (see “Query Optimizer”) can work with, and when cached the data is stored in Spark SQL’s internal encoding format.
The Dataset API is new in Spark 1.6 and will change in future versions. Users of the Dataset API are advised to treat it as a “preview.” Up-to-date documentation on the Dataset API can be found in the Scaladoc.
Datasets
can be easily converted to/from DataFrames
and RDDs, but in the initial version they do not directly extend either.
Converting to/from RDDs involves encoding/decoding the data into a different form.
Converting to/from DataFrames
is almost “free” in that the underlying data does not need to be changed; only extra compile-time type information is added/removed.
In Spark 2.0 the DataFrame
type has been replaced with a type alias to Dataset[Row]
.
The type alias for DataFrame
is not visible in Java for Spark 2.0, so updating Java code will require changing from DataFrame
to Dataset<Row>
.
To convert a DataFrame
to a Dataset
you can use the as[ElementType]
function on the DataFrame
to get a Dataset[ElementType]
back as shown in Example 3-50.
The ElementType
must be a case class, or similar such as tuple, consisting of types Spark SQL can represent (see “Basics of Schemas”).
To create Datasets
from local collections, createDataSet(...)
on the SQLContext
and the toDS()
implicit function are provided on Seqs in the same manner as createDataFrame(...)
and toDF()
.
For converting from RDD to Dataset
you can first convert from RDD to DataFrame
and then convert it to a Dataset
.
For loading data into a Dataset
, unless a special API is provided by your data source, you can first load your data into a DataFrame
and then convert it to a Dataset
. Since the conversion to the Dataset
simply adds information, you do not have the problem of eagerly evaluating, and future filters and similar operations can still be pushed down to the data store.
def
fromDF
(
df
:
DataFrame
)
:
Dataset
[
RawPanda
]
=
{
df
.
as
[
RawPanda
]
}
Converting from a Dataset
back to an RDD or DataFrame
can be done in similar ways as when converting DataFrames
, and both are shown in Example 3-51.
The toDF
simply copies the logical plan used in the Dataset
into a DataFrame
—so you don’t need to do any schema inference or conversion as you do when converting from RDDs.
Converting a Dataset
of type T
to an RDD of type T
can be done by calling .rdd
, which unlike calling toDF
, does involve converting the data from the internal SQL format to the regular types.
/**
* Illustrate converting a Dataset to an RDD
*/
def
toRDD
(
ds
:
Dataset
[
RawPanda
])
:
RDD
[
RawPanda
]
=
{
ds
.
rdd
}
/**
* Illustrate converting a Dataset to a DataFrame
*/
def
toDF
(
ds
:
Dataset
[
RawPanda
])
:
DataFrame
=
{
ds
.
toDF
()
}
One of the reasons to use Datasets
over traditional DataFrames
is their compile-time strong typing.
DataFrames
have runtime schema information but lack compile-time information about the schema.
This strong typing is especially useful when making libraries, because you can more clearly specify the requirements of your inputs and your return types.
One of the key advantages of the Dataset API is easier integration with custom Scala and Java code.
Datasets
expose filter
, map
, mapPartitions
, and flatMap
with similar function signatures as RDDs, with the notable requirement that your return ElementType
also be understandable by Spark SQL (such as tuple or case class of types discussed in “Basics of Schemas”).
Example 3-52 illustrates this using a simple map
function.
def
funMap
(
ds
:
Dataset
[
RawPanda
])
:
Dataset
[
Double
]
=
{
ds
.
map
{
rp
=>
rp
.
attributes
.
filter
(
_
>
0
).
sum
}
}
Beyond functional transformations, such as map
and filter
, you can also intermix relational and grouped/aggregate operations.
Datasets
introduce a typed version of select
for relational-style transformations.
When specifying an expression for this you need to include the type information, as shown in Example 3-53.
You can add this information by calling as[ReturnType]
on the expression/column.
def
squishyPandas
(
ds
:
Dataset
[
RawPanda
])
:
Dataset
[(
Long
,Boolean
)]
=
{
ds
.
select
(
$
"id"
.
as
[
Long
],
(
$
"attributes"
(
0
)
>
0.5
).
as
[
Boolean
])
}
Some operations, such as select
, have both typed and untyped implementations. If you supply a Column
rather than a TypedColumn
you will get a DataFrame
back instead of a Dataset
.
In addition to single Dataset
transformations, there are also transformations for working with multiple Datasets
. The standard set operations, namely intersect
, union
, and subtract
, are all available with the same standard caveats as discussed in Table 3-9.
Joining Datasets
is also supported, but to make the type information easier to work with, the return structure is a bit different than traditional SQL joins.
Similar to grouped operations on DataFrames
(described in “Aggregates and groupBy”), groupBy
on Datasets
prior to Spark 2.0 returns a GroupedDataset
or
a KeyValueGroupedDataset
when grouped with an arbitrary function, and
a RelationalGroupedDataset
when grouped with a relational/Dataset
DSL expression.
You can specify your aggregate functions on all of these, along with a functional mapGroups
API.
As with the expression in “Relational Transformations”, you need to use typed expressions so the result can also be a Dataset
.
Taking our previous example of computing the maximum panda size by zip in Example 3-23, you would rewrite it to be as shown in Example 3-54.
The convenience functions found on GroupedData
(e.g., min
, max
, etc.) are missing, so all of our aggregate expressions need to be specified through agg
.
def
maxPandaSizePerZip
(
ds
:
Dataset
[
RawPanda
])
:
Dataset
[(
String
,Double
)]
=
{
ds
.
map
(
rp
=>
MiniPandaInfo
(
rp
.
zip
,
rp
.
attributes
(
2
)))
.
groupByKey
(
mp
=>
mp
.
zip
).
agg
(
max
(
"size"
).
as
[
Double
])
}
Beyond applying typed SQL expressions to aggregated columns, you can also easily use arbitrary Scala code with mapGroups
on grouped data as shown in Example 3-55.
This can save us from having to write custom user-defined aggregate functions (UDAFs) (discussed in “Extending with User-Defined Functions and Aggregate Functions (UDFs, UDAFs)”).
While custom UDAFs can be painful to write, they may be able to give better performance than mapGroups
and can also be used on DataFrames
.
def
maxPandaSizePerZipScala
(
ds
:
Dataset
[
RawPanda
])
:
Dataset
[(
String
,Double
)]
=
{
ds
.
groupByKey
(
rp
=>
rp
.
zip
).
mapGroups
{
case
(
g
,
iter
)
=>
(
g
,
iter
.
map
(
_
.
attributes
(
2
)).
reduceLeft
(
Math
.
max
(
_
,
_
)))
}
}
User-defined functions and user-defined aggregate functions provide you with ways to extend the DataFrame and SQL APIs with your own custom code while keeping the Catalyst optimizer. The Dataset API (see “Datasets”) is another performant option for much of what you can do with UDFs and UDAFs. This is quite useful for performance, since otherwise you would need to convert the data to an RDD (and potentially back again) to perform arbitrary functions, which is quite expensive. UDFs and UDAFs can also be accessed from inside of regular SQL expressions, making them accessible to analysts or others more comfortable with SQL.
When using UDFs or UDAFs written in non-JVM languages, such as Python, it is important to note that you lose much of the performance benefit, as the data must still be transferred out of the JVM.
If most of your work is in Python but you want to access some UDFs without the performance penalty, you can write your UDFs in Scala and register them for use in Python (as done in Sparkling Pandas).8
Writing nonaggregate UDFs for Spark SQL is incredibly simple: you simply write a regular function and register it using sqlContext.udf().register
.
A simple string length UDF is shown in Example 3-56.
If you are registering a Java or Python UDF you also need to specify your return type.
def
setupUDFs
(
sqlCtx
:
SQLContext
)
=
{
sqlCtx
.
udf
.
register
(
"strLen"
,
(
s
:
String
)
=>
s
.
length
())
}
Even with JVM languages UDFs are generally slower than the equivalent SQL expression would be if it exists. Some early work is being done in SPARK-14083 to parse JVM byte code and generate SQL expressions.
Aggregate functions (or UDAFs) are somewhat trickier to write.
Instead of writing a regular Scala function, you extend the UserDefinedAggregateFunction
and implement a number of different functions, similar to the functions one might write for aggregateByKey
on an RDD, except working with different data structures.
While they can be complex to write, UDAFs can be quite performant compared with options like mapGroups
on Datasets
or even simply written aggregateByKey
on RDDs.
You can then either use the UDAF directly on columns or add it to the function registry as you did for the nonaggregate UDF.
Example 3-57 is a simple UDAF for computing the average, although you will likely want to use Spark’s built in avg
in real life.
def
setupUDAFs
(
sqlCtx
:
SQLContext
)
=
{
class
Avg
extends
UserDefinedAggregateFunction
{
// Input type
def
inputSchema
:
org.apache.spark.sql.
type
s.StructType
=
StructType
(
StructField
(
"value"
,
DoubleType
)
::
Nil
)
def
bufferSchema
:
StructType
=
StructType
(
StructField
(
"count"
,
LongType
)
::
StructField
(
"sum"
,
DoubleType
)
::
Nil
)
// Return type
def
dataType
:
DataType
=
DoubleType
def
deterministic
:
Boolean
=
true
def
initialize
(
buffer
:
MutableAggregationBuffer
)
:
Unit
=
{
buffer
(
0
)
=
0L
buffer
(
1
)
=
0.0
}
def
update
(
buffer
:
MutableAggregationBuffer
,
input
:
Row
)
:
Unit
=
{
buffer
(
0
)
=
buffer
.
getAs
[
Long
](
0
)
+
1
buffer
(
1
)
=
buffer
.
getAs
[
Double
](
1
)
+
input
.
getAs
[
Double
](
0
)
}
def
merge
(
buffer1
:
MutableAggregationBuffer
,
buffer2
:
Row
)
:
Unit
=
{
buffer1
(
0
)
=
buffer1
.
getAs
[
Long
](
0
)
+
buffer2
.
getAs
[
Long
](
0
)
buffer1
(
1
)
=
buffer1
.
getAs
[
Double
](
1
)
+
buffer2
.
getAs
[
Double
](
1
)
}
def
evaluate
(
buffer
:
Row
)
:
Any
=
{
buffer
.
getDouble
(
1
)
/
buffer
.
getLong
(
0
)
}
}
// Optionally register
val
avg
=
new
Avg
sqlCtx
.
udf
.
register
(
"ourAvg"
,
avg
)
}
This is a little more complicated than our regular UDF, so let’s take a look at what the different parts do.
You start by specifying what the input type is, then you specify the schema of the buffer you will use for storing the in-progress work.
These schemas are specified in the same way as DataFrame
and Dataset
schemas, discussed in “Basics of Schemas”.
From there the rest of the functions are implementing the same functions you use when writing aggregateByKey
on an RDD, but instead of taking arbitrary Scala objects you work with Row
and MutableAggregationBuffer
. The final evaluate
function takes the Row
representing the aggregation data and returns the final result.
UDFs, UDAFs, and Datasets
all provide ways to intermix arbitrary code with Spark SQL.
Catalyst is the Spark SQL query optimizer, which is used to take the query plan and transform it into an execution plan that Spark can run.
Much as our transformations on RDDs build up a DAG, as we apply relational and functional transformations on DataFrames
/Datasets
, Spark SQL builds up a tree representing our query plan, called a logical plan. Spark is able to apply a number of optimizations on the logical plan and can also choose between multiple physical plans for the same logical plan using a cost-based model.
The logical plan you construct through transformations on DataFrames
/Datasets
(or SQL queries) starts out as an unresolved logical plan. Much like a compiler, the Spark optimizer is multiphased and before any optimizations can be performed, it needs to resolve the references and types of the expressions.
This resolved plan is referred to as the logical plan, and Spark applies a number of simplifications directly on the logical plan, producing an optimized logical plan.
These simplifications can be written using pattern matching on the tree, such as the rule for simplifying additions between two literals. The optimizer is not limited to pattern matching, and rules can also include arbitrary Scala code.
Once the logical plan has been optimized, Spark will produce a physical plan. The physical plan stage has both rule-based and cost-based optimizations to produce the optimal physical plan. One of the most important optimizations at this stage is predicate pushdown to the data source level.
As a final step, Spark may also apply code generation for the components. Code generation is done using Janino to compile Java code. Earlier versions used Scala’s Quasi Quotes,9 but the overhead was too high to enable code generation for small datasets. In some TPCDS queries, code generation can result in >10× improvement in performance.
In some early versions of Spark for complex queries, code generation can cause failures. If you are on an old version of Spark and run into an unexpected failure, it can be worth disabling codegen by setting spark.sql.codegen
or spark.sql.tungsten.enabled
to false (depending on version).
While the Catalyst optimizer is quite powerful, one of the cases where it currently runs into challenges is with very large query plans.
These query plans tend to be the result of iterative algorithms, like graph algorithms or machine learning algorithms.
One simple workaround for this is converting the data to an RDD and back to DataFrame
/Dataset
at the end of each iteration, as shown in Example 3-58.
Although if you’re in Python, be sure to use the underlying Java RDD rather than round-tripping through Python (see Example 7-5 for how to do this).
Another, somewhat more heavy option, is to write the data to storage and continue from there.
val
rdd
=
df
.
rdd
rdd
.
cache
()
sqlCtx
.
createDataFrame
(
rdd
,
df
.
schema
)
This issue is being tracked in SPARK-13346 and you can see the workaround used in GraphFrames.
While Spark SQL’s query optimizer has access to much more information, we still sometimes need to take a peek under the hood and to make sure it’s working as we expected. Similar to toDebugString
on RDDs
, we have explain
and printSchema
functions on DataFrames
.
One thing that can make a big difference is figuring out if Spark SQL was able to push down a filter. In early versions of Spark SQL filter pushdown didn’t always happen as expected, so the filter sometimes needed to be reordered to be right next to the data load. In newer versions filter pushdown is more likely to fail due to a misconfiguration of your data source.
Spark SQL provides a JDBC server to allow external tools, such as business intelligence GUIs like Tableau, to work with data accessible in Spark and to share resources. Spark SQL’s JDBC server requires that Spark be built with Hive support.
Since the server tends to be long lived and runs on a single context, it can also be a good way to share cached tables between multiple users.
Spark SQL’s JDBC server is based on the HiveServer2 from Hive, and most corresponding connectors designed for HiveServer2 can be used directly with Spark SQL’s JDBC server. Simba also offers specific drivers for Spark SQL.
The server can either be started from the command line or started using an existing HiveContext.
The command-line start and stop commands are ./sbin/start-thriftserver.sh
and ./sbin/stop-thriftserver.sh
.
When starting from the command line, you can configure the different Spark SQL properties by specifying --hiveconf property=value
on the command line. Many of the rest of the command-line parameters match that of spark-submit
.
The default host and port is localhost:10000
and can be configured with hive.server2.thrift.port
and hive.server2.thrift.bind.host
.
When starting the JDBC server using an existing HiveContext
, you can simply update the config properties on the context instead of specifying command-line parameters.
Examples 3-59 and 3-60 illustrate two different ways to configure the port used by the thrift server.
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=
9090
hiveContext
.
setConf
(
"hive.server2.thrift.port"
,
"9090"
)
HiveThriftServer2
.
startWithContext
(
hiveContext
)
When starting the JDBC server on an existing HiveContext
, make sure to shut down the JDBC server when exiting.
The considerations for using DataFrames
/Datasets
over RDDs are complex and changing with the rapid development of Spark SQL.
One of the cases where Spark SQL can be difficult to use is when the number of partitions needed for different parts of your pipeline changes, or if you otherwise wish to control the partitioner.
While RDDs lack the Catalyst optimizer and relational style queries, they are able to work with a wider variety of data types and provide more direct control over certain types of operations.
DataFrames
and Datasets
also only work with a restricted subset of data types—but when your data is in one of these supported classes the performance improvements of using the Catalyst optimizer provide a compelling case for accepting those restrictions.
DataFrames
can be used when you have primarily relational transformations, which can be extended with UDFs when necessary.
Compared to RDDs, DataFrames
benefit from the efficient storage format of Spark SQL, the Catalyst optimizer, and the ability to perform certain operations directly on the serialized data.
One drawback to working with DataFrames
is that they are not strongly typed at compile time, which can lead to errors with incorrect column access and other simple mistakes.
Datasets
can be used when you want a mix of functional and relational transformations while benefiting from the optimizations for DataFrames
and are, therefore, a great alternative to RDDs in many cases.
As with RDDs, Datasets
are parameterized on the type of data contained in them, which allows for strong compile-time type checking but
requires that you know your data type at compile time (although Row
or other generic type can be used).
The additional type safety of Datasets
can be beneficial even for applications that do not need the specific functionality of DataFrames
.
One potential drawback is that the Dataset API is continuing to evolve, so updating to future versions of Spark may require code changes.
Pure RDDs work well for data that does not fit into the Catalyst optimizer.
RDDs have an extensive and stable functional API, and upgrades to newer versions of Spark are unlikely to require substantial code changes.
RDDs also make it easy to control partitioning, which can be very useful for many distributed algorithms.
Some types of operations, such as multicolumn aggregates, complex joins, and windowed operations, can be daunting to express with the RDD API.
RDDs can work with any Java or Kryo serializable data, although the serialization is more often more expensive and less space efficient than the equivalent in DataFrames
/Datasets
.
Now that you have a good understanding of Spark SQL, it’s time to continue on to joins, for both RDDs and Spark SQL.
1 UDFs allow us to extend SQL to have additional powers, such as computing the geospatial distance between points.
2 A row at a time allows for narrow transformations with no shuffle.
3 A column literal is a column with a fixed value that doesn’t change between rows (i.e., constant).
4 StringIndexer in the ML pipeline is designed for string index encoding.
5 Some types may not be correctly implemented for all databases.
6 spark-csv
is now included as part of Spark 2.0.
7 For example, only reading the required partitions when a filter matches one of our partitioning schemes.
8 As of this writing, the Sparkling Pandas project development is on hold but early releases still contain some interesting examples of using JVM code from Python.
9 Scala Quasi Quotes are part of Scala’s macro system.