Chapter 3. DataFrames, Datasets, and Spark SQL

Spark SQL and its DataFrames and Datasets interfaces are the future of Spark performance, with more efficient storage options, advanced optimizer, and direct operations on serialized data. These components are super important for getting the best of Spark performance (see Figure 3-1).

RDD versus DataFrame performance
Figure 3-1. Relative performance for RDD versus DataFrames based on SimplePerfTest computing aggregate average fuzziness of pandas

These are relatively new components; Datasets were introduced in Spark 1.6, DataFrames in Spark 1.3, and the SQL engine in Spark 1.0. This chapter is focused on helping you learn how to best use Spark SQL’s tools and how to intermix Spark SQL with traditional Spark operations.

Warning

Spark’s DataFrames have very different functionality compared to traditional DataFrames like Panda’s and R’s. While these all deal with structured data, it is important not to depend on your existing intuition surrounding DataFrames.

Like RDDs, DataFrames and Datasets represent distributed collections, with additional schema information not found in RDDs. This additional schema information is used to provide a more efficient storage layer (Tungsten), and in the optimizer (Catalyst) to perform additional optimizations. Beyond schema information, the operations performed on Datasets and DataFrames are such that the optimizer can inspect the logical meaning rather than arbitrary functions. DataFrames are Datasets of a special Row object, which doesn’t provide any compile-time type checking. The strongly typed Dataset API shines especially for use with more RDD-like functional operations. Compared to working with RDDs, DataFrames allow Spark’s optimizer to better understand our code and our data, which allows for a new class of optimizations we explore in “Query Optimizer”.

Warning

While Spark SQL, DataFrames, and Datasets provide many excellent enhancements, they still have some rough edges compared to traditional processing with “regular” RDDs. The Dataset API, being brand new at the time of this writing, is likely to experience some changes in future versions.

Getting Started with the SparkSession (or HiveContext or SQLContext)

Much as the SparkContext is the entry point for all Spark applications, and the StreamingContext is for all streaming applications, the SparkSession serves as the entry point for Spark SQL. Like with all of the Spark components, you need to import a few extra components as shown in Example 3-1.

Tip

If you are using the Spark Shell you will automatically get a SparkSession called spark to accompany the SparkContext called sc.

Example 3-1. Spark SQL imports
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession, Row}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
Warning

Scala’s type alias of DataFrame = Dataset[Row] is broken in Java—you must use Dataset<Row> instead.

SparkSession is generally created using the builder pattern, along with getOrCreate(), which will return an existing session if one is already running. The builder can take string-based configuration keys config(key, value), and shortcuts exist for a number of common params. One of the more important shortcuts is enableHiveSupport(), which will give you access to Hive UDFs and does not require a Hive installation—but does require certain extra JARs (discussed in “Spark SQL Dependencies”). Example 3-2 shows how to create a SparkSession with Hive support. The enableHiveSupport() shortcut not only configures Spark SQL to use these Hive JARs, but it also eagerly checks that they can be loaded—leading to a clearer error message than setting configuration values by hand. In general, using shortcuts listed in the API docs, is advised when they are present, since no checking is done in the generic config interface.

Example 3-2. Create a SparkSession
    val session = SparkSession.builder()
      .enableHiveSupport()
      .getOrCreate()
    // Import the implicits, unlike in core Spark the implicits are defined
    // on the context.
    import session.implicits._
Warning

When using getOrCreate, if an existing session exists your configuration values may be ignored and you will simply get the existing SparkSession. Some options, like master, will also only apply if there is no existing SparkContext running; otherwise, the existing SparkContext will be used.

Before Spark 2.0, instead of the SparkSession, two separate entry points (HiveContext or SQLContext) were used for Spark SQL. The names of these entry points can be a bit confusing, and it is important to note the HiveContext does not require a Hive installation. The primary reason to use the SQLContext is if you have conflicts with the Hive dependencies that cannot be resolved. The HiveContext has a more complete SQL parser compared to the SQLContext as well as additional user-defined functions (UDFs).1 Example 3-4 shows how to create a legacy HiveContext. The SparkSession should be preferred when possible, followed by the HiveContext, then SQLContext. Not all libraries, or even all Spark code, has been updated to take the SparkSession and in some cases you will find functions that still expect a SQLContext or HiveContext.

If you need to construct one of the legacy interfaces (SQLContext or HiveContext) the additional imports in Example 3-3 will be useful.

Example 3-3. Spark SQL legacy imports
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver._
Tip

Getting a HiveContext or SQLContext from a SparkSession is not well supported outside of the org.apache.spark scope—however, getOrCreate can be used.

Example 3-4. Creating the HiveContext
    val hiveContext = new HiveContext(sc)
    // Import the implicits, unlike in core Spark the implicits are defined
    // on the context.
    import hiveContext.implicits._

Spark SQL Dependencies

Like the other components in Spark, using Spark SQL requires adding additional dependencies. If you have conflicts with the Hive JARs you can’t fix through shading, you can just limit yourself to the spark-sql JAR—although you want to have access to the Hive dependencies without also including the spark-hive JAR.

To enable Hive support in SparkSession or use the HiveContext you will need to add both Spark’s SQL and Hive components to your dependencies.

For Maven-compatible build systems, the coordinates for Spark’s SQL and Hive components in 2.2.0 are org.apache.spark:spark-sql_2.11:2.2.0 and org.apache​.spark:spark-hive_2.11:2.2.0. Example 3-5 shows how to add them to a “regular” sbt build, and Example 3-6 shows the process for Maven users.

Example 3-5. Add Spark SQL and Hive component to “regular” sbt build
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.2.0",
  "org.apache.spark" %% "spark-hive" % "2.2.0")
Example 3-6. Add Spark SQL and Hive component to Maven pom file
 <dependency> <!-- Spark dependency -->
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.2.0</version>
</dependency>
<dependency> <!-- Spark dependency -->
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

Managing Spark Dependencies

While managing these dependencies by hand isn’t particularly challenging, sometimes mistakes can be made when updating versions. The sbt-spark-package plug-in can simplify managing Spark dependencies. This plug-in is normally used for creating community packages (discussed in “Creating a Spark Package”), but also assist in building software that depends on Spark. To add the plug-in to your sbt build you need to create a project/plugins.sbt file and make sure it contains the code in Example 3-7.

Example 3-7. Including sbt-spark-package in project/plugins.sbt
resolvers += ["Spark Package Main Repo" at
  "https://dl.bintray.com/spark-packages/maven"]

addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.5")
Tip

If you are starting a new Spark project, there is a g8 template, holdenk/sparkProjectTemplate you can use to bootstrap your new project by running sbt new holdenk/sparkProjectTemplate.

For spark-packages to work you will need to specify a Spark version and at least one Spark component (core), which can be done in sbt settings as shown in Example 3-8.

Example 3-8. Configuring Spark version and “core” component
sparkVersion := "2.2.0"
sparkComponents ++= Seq("core")

Once you have sbt-spark-package installed and set up, you can add the Spark components by just adding SQL and Hive to your list of sparkComponents as shown in Example 3-9.

Example 3-9. Add Spark SQL and Hive component to sbt-spark-package build
sparkComponents ++= Seq("sql", "hive", "hive-thriftserver", "hive-thriftserver")

While it’s not required, if you do have an existing Hive Metastore to which you wish to connect with Spark, you can copy your hive-site.xml to Spark’s conf/ directory.

Tip

The default Hive Metastore version is 1.2.1. For other versions of the Hive Metastore you will need to set the spark.sql.hive.metastore.version property to the desired versions as well as set spark.sql.hive.metastore.jars to either “maven” (to have Spark retrieve the JARs) or the system path where the Hive JARs are present.

Avoiding Hive JARs

If you can’t include the Hive dependencies with your application, you can leave out Spark’s Hive component and instead create a SQLContext, as shown in Example 3-10. This provides much of the same functionality, but uses a less capable SQL parser and lacks certain Hive-based user-defined functions (UDFs) and user-defined aggregate functions (UDAFs).

Example 3-10. Creating the SQLContext
    val sqlContext = new SQLContext(sc)
    // Import the implicits, unlike in core Spark the implicits are defined
    // on the context.
    import sqlContext.implicits._

As with the core SparkContext and StreamingContext, the Hive/SQLContext is used to load your data. JSON is a very popular format, in part because it can be easily loaded in many languages, and is at least semi–human-readable. Some of the sample data we’ve included in the book is in JSON format for exactly these reasons. JSON is especially interesting since it lacks schema information, and Spark needs to do some work to infer the schema from our data. JSON can also be expensive to parse; in some simple cases parsing the input JSON data can be greater than the actual operation. We will cover the full loading and saving API for JSON in “JSON”, but to get started, let’s load a sample we can use to explore the schema (see Example 3-11).

Example 3-11. Load JSON sample
    val df1 = session.read.json(path)

Feel free to load your own JSON data, but if you don’t have any handy to test with, check out the examples GitHub resources directory. Now that you’ve got the JSON data loaded you can start by exploring what schema Spark has managed to infer for your data.

Basics of Schemas

The schema information, and the optimizations it enables, is one of the core differences between Spark SQL and core Spark. Inspecting the schema is especially useful for DataFrames since you don’t have the templated type you do with RDDs or Datasets. Schemas are normally handled automatically by Spark SQL, either inferred when loading the data or computed based on the parent DataFrames and the transformation being applied.

DataFrames expose the schema in both human-readable or programmatic formats. printSchema() will show us the schema of a DataFrame and is most commonly used when working in the shell to figure out what you are working with. This is especially useful for data formats, like JSON, where the schema may not be immediately visible by looking at only a few records or reading a header. For programmatic usage, you can get the schema by simply calling schema, which is often used in ML pipeline transformers. Since you are likely familiar with case classes and JSON, let’s examine how the equivalent Spark SQL schema would be represented in Examples 3-12 and 3-13.

Example 3-12. JSON data that would result in an equivalent schema
{"name":"mission","pandas":[{"id":1,"zip":"94110","pt":"giant", "happy":true,
        "attributes":[0.4,0.5]}]}
Example 3-13. Equivalent case class
case class RawPanda(id: Long, zip: String, pt: String,
                            happy: Boolean, attributes: Array[Double])
case class PandaPlace(name: String, pandas: Array[RawPanda])

Now with the case classes defined you can create a local instance, turn it into a Dataset, and print the schema as shown in Example 3-14, resulting in Example 3-15. The same can be done with the JSON data, but requires some configuration as discussed in “JSON”.

Example 3-14. Create a Dataset with the case class
  def createAndPrintSchema() = {
    val damao = RawPanda(1, "M1B 5K7", "giant", true, Array(0.1, 0.1))
    val pandaPlace = PandaPlace("toronto", Array(damao))
    val df = session.createDataFrame(Seq(pandaPlace))
    df.printSchema()
  }
Example 3-15. Sample schema information for nested structure (.printSchema())
root
 |-- name: string (nullable = true)
 |-- pandas: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = false)
 |    |    |-- zip: string (nullable = true)
 |    |    |-- pt: string (nullable = true)
 |    |    |-- happy: boolean (nullable = false)
 |    |    |-- attributes: array (nullable = true)
 |    |    |    |-- element: double (containsNull = false)

In addition to the human-readable schema, the schema information is also available for you to use programmatically. The programatic schema is returned as a StructField, as shown in Example 3-16.

Example 3-16. StructField case class
case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty)
....

Example 3-17 shows the same schema as Example 3-15, in machine-readable format.

Example 3-17. Sample schema information for nested structure (.schema())—manually formatted
org.apache.spark.sql.types.StructType = StructType(
  StructField(name,StringType,true),
  StructField(pandas,
    ArrayType(
      StructType(StructField(id,LongType,false),
                     StructField(zip,StringType,true),
                     StructField(pt,StringType,true),
                     StructField(happy,BooleanType,false),
                     StructField(attributes,ArrayType(DoubleType,false),true)),
                true),true))

From here you can dive into what this schema information means and look at how to construct more complex schemas. The first part is a StructType, which contains a list of fields. It’s important to note you can nest StructTypes, like how a case class can contain additional case classes. The fields in the StructType are defined with StructField, which specifies the name, type (see Tables 3-1 and 3-2 for a listing of types), and a Boolean indicating if the field may be null/missing.

Table 3-1. Basic Spark SQL types
Scala type SQL type Details

Byte

ByteType

1-byte signed integers (–128,127)

Short

ShortType

2-byte signed integers (–32768,32767)

Int

IntegerType

4-byte signed integers (–2147483648,2147483647)

Long

LongType

8-byte signed integers (–9223372036854775808,
9223372036854775807)

java.math.BigDecimal

DecimalType

Arbitrary precision signed decimals

Float

FloatType

4-byte floating-point number

Double

DoubleType

8-byte floating-point number

Array[Byte]

BinaryType

Array of bytes

Boolean

BooleanType

true/false

java.sql.Date

DateType

Date without time information

java.sql.Timestamp

TimestampType

Date with time information (second precision)

String

StringType

Character string values (stored as UTF8)

Table 3-2. Complex Spark SQL types
Scala type SQL type Details Example

Array[T]

ArrayType(elementType, containsNull)

Array of single type of element, containsNull true if any null elements.

Array[Int] => ArrayType(IntegerType, true)

Map[K, V]

MapType(elementType, valueType, valueContainsNull)

Key/value map, valueContainsNull if any values are null.

Map[String, Int] => MapType(StringType, IntegerType, true)

case class

StructType(List[StructFields])

Named fields of possible heterogeneous types, similar to a case class or JavaBean.

case class Panda(name: String, age: Int) => StructType(List(StructField("name", StringType, true), StructField("age", IntegerType, true)))

Tip

As you saw in Example 3-17, you can nest StructFields and all of the complex Spark SQL types.

Now that you’ve got an idea of how to understand and, if needed, specify schemas for your data, you are ready to start exploring the DataFrame interfaces.

Tip

Spark SQL schemas are eagerly evaluated, unlike the data underneath. If you find yourself in the shell and uncertain of what a transformation will do, try it and print the schema. See Example 3-15.

DataFrame API

Spark SQL’s DataFrame API allows us to work with DataFrames without having to register temporary tables or generate SQL expressions. The DataFrame API has both transformations and actions. The transformations on DataFrames are more relational in nature, with the Dataset API (covered next) offering a more functional-style API.

Transformations

Transformations on DataFrames are similar in concept to RDD transformations, but with a more relational flavor. Instead of specifying arbitrary functions, which the optimizer is unable to introspect, you use a restricted expression syntax so the optimizer can have more information. As with RDDs, we can broadly break down transformations into simple single DataFrame, multiple DataFrame, key/value, and grouped/windowed transformations.

Warning

Spark SQL transformations are only partially lazy; the schema is eagerly evaluated.

Simple DataFrame transformations and SQL expressions

Simple DataFrame transformations allow us to do most of the standard things one can do when working a row at a time.2 You can still do many of the same operations defined on RDDs, except using Spark SQL expressions instead of arbitrary functions. To illustrate this we will start by examining the different kinds of filter operations available on DataFrames.

DataFrame functions, like filter, accept Spark SQL expressions instead of lambdas. These expressions allow the optimizer to understand what the condition represents, and with filter, it can often be used to skip reading unnecessary records.

To get started, let’s look at a SQL expression to filter our data for unhappy pandas using our existing schema. The first step is looking up the column that contains this information. In our case it is happy, and for our DataFrame (called df) we access the column through the apply function (e.g., df("happy")). The filter expression requires the expression to return a boolean value, and if you wanted to select happy pandas, the entire expression could be retrieving the column value. However, since we want to find the unhappy pandas, we can check to see that happy isn’t true using the !== operator as shown in Example 3-18.

Example 3-18. Simple filter for unhappy pandas
    pandaInfo.filter(pandaInfo("happy") !== true)
Tip

To look up the column, we can either provide the column name on the specific DataFrame or use the implicit $ operator for column lookup. This is especially useful when the DataFrame is anonymous. The ! binary negation function can be used together with $ to simplify our expression from Example 3-18 down to df.filter(!$("happy")).

This illustrates how to access a specific column from a DataFrame. For accessing other structures inside of DataFrames, like nested structs, keyed maps, and array elements, use the same apply syntax. So, if the first element in the attributes array represent squishiness, and you only want very squishy pandas, you can access that element by writing df("attributes")(0) >= 0.5.

Our expressions need not be limited to a single column. You can compare multiple columns in our “filter” expression. Complex filters like that shown in Example 3-19 are more difficult to push down to the storage layer, so you may not see the same speedup over RDDs that you see with simpler filters.

Example 3-19. More complex filter
    pandaInfo.filter(
      pandaInfo("happy").and(pandaInfo("attributes")(0) > pandaInfo("attributes")(1))
    )
Warning

Spark SQL’s column operators are defined on the column class, so a filter containing the expression 0 >= df.col("friends") will not compile since Scala will use the >= defined on 0. Instead you would write df.col("friend") <= 0 or convert 0 to a column literal with lit.3

Spark SQL’s DataFrame API has a very large set of operators available. You can use all of the standard mathematical operators on floating points, along with the standard logical and bitwise operations (prefix with bitwise to distinguish from logical). Columns use === and !== for equality to avoid conflict with Scala internals. For columns of strings, startsWith/endsWith, substr, like, and isNull are all available. The full set of operations is listed in org.apache.spark.sql.Column and covered in Table 3-3 and Table 3-4.

Table 3-3. Spark SQL Scala operators
Scala operator Java equivalent Input column types Output type Purpose Sample Result

!==

notEqual

Any

Boolean

Check if expressions not equal

"hi" !== "bye"

true

%

mod

Numeric

Numeric

Modulo

10 % 5

0

&&

and

Boolean

Boolean

Boolean and

true && false

false

*

multiply

Numeric

Numeric

Multiply expressions

2 * 21

42

+

plus

Numeric

Numeric

Sum expression

2 + 2

4

-

minus

Numeric

Numeric

Subtraction

2 - 2

0

-

unary_-

Numeric

Numeric

Unary subtraction

-42

-42

/

division

Numeric

Double

Division

43/2

21.5

<

lt

Comparable

Boolean

Less than

"a" < "b"

true

<=

leq

Comparable

Boolean

Less than or equal to

"a" <= "a"

true

===

equals

Any

Any

Equality test (unsafe on null values)

"a" === "a"

true

<=>

eqNullSafe

Any

Any

Equality test (safe on null values)

"a" <=> "a"

true

>

gt

Comparable

Boolean

Greater than

"a" > "b"

false

>=

ge

Comparable

Boolean

Greater than or equal to

"a" >= "b"

false

Table 3-4. Spark SQL expression operators
Operator Input column types Output type Purpose Sample Result

apply

Complex types

Type of field accessed

Get value from complex type (e.g., structfield/map
lookup or array index)

[1,2,3].apply(0)

1

bitwiseAND

Integral Type a

Same as input

Computes and bitwise

21.bitwiseAND(11)

1

bitwiseOR

Integral Type a

Same as input

Computes or bitwise

21.bitwiseOR(11)

31

bitwiseXOR

Integral Type a

Same as input

Computes bitwise exclusive or

21.bitwiseXOR(11)

30

a Integral types include ByteType, IntegerType, LongType, and ShortType.

Warning

Not all Spark SQL expressions can be used in every API call. For example, Spark SQL joins do not support complex operations, and filter requires that the expression result in a boolean, and similar.

In addition to the operators directly specified on the column, an even larger set of functions on columns exists in org.apache.spark.sql.functions, some of which we cover in Tables 3-5, 3-6, and 3-7. For illustration, this example shows the values for each column at a specific row, but keep in mind that these functions are called on columns, not values.

Table 3-5. Spark SQL standard functions
Function name Purpose Input types Example usage Result

lit(value)

Convert a Scala symbol to a column literal

Column & Symbol

lit(1)

Column(1)

array

Create a new array column

Must all have the same Spark SQL type

array(lit(1),lit(2))

array(1,2)

isNaN

Check if not a number

Numeric

isNan(lit(100.0))

false

not

Opposite value

Boolean

not(lit(true))

false

Table 3-6. Spark SQL common mathematical expressions
Function name Purpose Input types Example usage Result

abs

Absolute value

Numeric

abs(lit(-1))

1

sqrt

Square root

Numeric

sqrt(lit(4))

2

acos

Inverse cosine

Numeric

acos(lit(0.5))

1.04…. a

asin

Inverse sine

Numeric

asin(lit(0.5))

0.523… a

atan

Inverse tangent

Numeric

atan(lit(0.5))

0.46… a

cbrt

Cube root

Numeric

sqrt(lit(8))

2

ceil

Ceiling

Numeric

ceil(lit(8.5))

9

cos

Cosine

Numeric

cos(lit(0.5))

0.877…. a

sin

Sine

Numeric

sin(lit(0.5))

0.479… a

tan

Tangent

Numeric

tan(lit(0.5))

0.546… a

exp

Exponent

Numeric

exp(lit(1.0))

2.718… a

floor

Ceiling

Numeric

floor(lit(8.5))

8

least

Minimum value

Numerics

least(lit(1), lit(-10))

-10

a Truncated for display purposes.

Table 3-7. Functions for use on Spark SQL arrays
Function name Purpose Example usage Result

array_contains

If an array contains a value.

array_contains(lit(Array(
  2,3,-1), 3))

true

sort_array

Sort an array (ascending default).

sort_array(lit(Array(
  2,3,-1)))

Array(-1,2,3)

explode

Create a row for each element in the array—often useful when working with nested JSON records. Either takes a column name or additional function mapping from row to iterator of case classes.

explode(lit(Array(
  2,3,-1)), "murh")

Row(2), Row(3), Row(-1)

Beyond simply filtering out data, you can also produce a DataFrame with new columns or updated values in old columns. Spark uses the same expression syntax we discussed for filter, except instead of having to include a condition (like testing for equality), the results are used as values in the new DataFrame. To see how you can use select on complex and regular data types, Example 3-20 uses the Spark SQL explode function to turn an input DataFrame of PandaPlaces into a DataFrame of just PandaInfo as well as computing the “squishness” to “hardness” ratio of each panda.

Example 3-20. Spark SQL select and explode operators
    val pandaInfo = pandaPlace.explode(pandaPlace("pandas")){
      case Row(pandas: Seq[Row]) =>
        pandas.map{
          case (Row(
            id: Long,
            zip: String,
            pt: String,
            happy: Boolean,
            attrs: Seq[Double])) =>
            RawPanda(id, zip, pt, happy, attrs.toArray)
        }}
    pandaInfo.select(
      (pandaInfo("attributes")(0) / pandaInfo("attributes")(1))
        .as("squishyness"))
Tip

When you construct a sequence of operations, the generated column names can quickly become unwieldy, so the as or alias operators are useful to specify the resulting column name.

While all of these operations are quite powerful, sometimes the logic you wish to express is more easily encoded with if/else semantics. Example 3-21 is a simple example of this, and it encodes the different types of panda as a numeric value.4 The when and otherwise functions can be chained together to create the same effect.

Example 3-21. If/else in Spark SQL
  /**
    * Encodes pandaType to Integer values instead of String values.
    *
    * @param pandaInfo the input DataFrame
    * @return Returns a DataFrame of pandaId and integer value for pandaType.
    */
  def encodePandaType(pandaInfo: DataFrame): DataFrame = {
    pandaInfo.select(pandaInfo("id"),
      (when(pandaInfo("pt") === "giant", 0).
      when(pandaInfo("pt") === "red", 1).
      otherwise(2)).as("encodedType")
    )
  }

Specialized DataFrame transformations for missing and noisy data

Spark SQL also provides special tools for handling missing, null, and invalid data. By using isNan or isNull along with filters, you can create conditions for the rows you want to keep. For example, if you have a number of different columns, perhaps with different levels of precision (some of which may be null), you can use coalesce(c1, c2, ..) to return the first nonnull column. Similarly, for numeric data, nanvl returns the first non-NaN value (e.g., nanvl(0/0, sqrt(-2), 3) results in 3). To simplify working with missing data, the na function on DataFrame gives us access to some common routines for handling missing data in DataFrameNaFunctions.

Beyond row-by-row transformations

Sometimes applying a row-by-row decision, as you can with filter, isn’t enough. Spark SQL also allows us to select the unique rows by calling dropDuplicates, but as with the similar operation on RDDs (distinct), this can require a shuffle, so is often much slower than filter. Unlike with RDDs, dropDuplicates can optionally drop rows based on only a subset of the columns, such as an ID field, as shown in Example 3-22.

Example 3-22. Drop duplicate panda IDs
    pandas.dropDuplicates(List("id"))

This leads nicely into our next section on aggregates and groupBy since often the most expensive component of each is the shuffle.

Aggregates and groupBy

Spark SQL has many powerful aggregates, and thanks to its optimizer it can be easy to combine many aggregates into one single action/query. Like with Pandas’ DataFrames, groupBy returns special objects on which we can ask for certain aggregations to be performed. In pre-2.0 versions of Spark, this was a generic GroupedData, but in versions 2.0 and beyond, DataFrames groupBy is the same as one Datasets.

Aggregations on Datasets have extra functionality, returning a GroupedDataset (in pre-2.0 versions of Spark) or a KeyValueGroupedDataset when grouped with an arbitrary function, and a RelationalGroupedDataset when grouped with a relational/Dataset DSl expression. The additional typed functionality is discussed in “Grouped Operations on Datasets”, and the common “untyped” DataFrame and Dataset groupBy functionality is explored here.

min, max, avg, and sum are all implemented as convenience functions directly on GroupedData, and more can be specified by providing the expressions to agg. Example 3-23 shows how to compute the maximum panda size by zip code. Once you specify the aggregates you want to compute, you can get the results back as a DataFrame.

Tip

If you’re used to RDDs you might be concerned by groupBy, but it is now a safe operation on DataFrames thanks to the Spark SQL optimizer, which automatically pipelines our reductions, avoiding giant shuffles and mega records.

Example 3-23. Compute the max panda size by zip code
  def maxPandaSizePerZip(pandas: DataFrame): DataFrame = {
    pandas.groupBy(pandas("zip")).max("pandaSize")
  }

While Example 3-23 computes the max on a per-key basis, these aggregates can also be applied over the entire DataFrame or all numeric columns in a DataFrame. This is often useful when trying to collect some summary statistics for the data with which you are working. In fact, there is a built-in describe transformation which does just that, although it can also be limited to certain columns, which is used in Example 3-24 and returns Example 3-25.

Example 3-24. Compute some common summary stats, including count, mean, stddev, and more, on the entire DataFrame
    // Compute the count, mean, stddev, min, max summary stats for all
    // of the numeric fields of the provided panda infos. non-numeric
    // fields (such as string (name) or array types) are skipped.
    val df = pandas.describe()
    // Collect the summary back locally
    println(df.collect())
Example 3-25. Result of describe and collect on some small sample data (note: summarizes all of the numeric fields)
Array([count,3,3], [mean,1.3333333333333333,5.0],
  [stddev,0.5773502691896258,4.358898943540674], [min,1,2], [max,2,10])
Warning

The behavior of groupBy has changed between Spark versions. Prior to Spark 1.3 the values of the grouping columns are discarded by default, while post 1.3 they are retained. The configuration parameter, spark.sql.retainGroupColumns, can be set to false to force the earlier functionality.

For computing multiple different aggregations, or more complex aggregations, you should use the agg API on the GroupedData instead of directly calling count, mean, or similar convenience functions. For the agg API, you either supply a list of aggregate expressions, a string representing the aggregates, or a map of column names to aggregate function names. Once we’ve called agg with the requested aggregates, we get back a regular DataFrame with the aggregated results. As with regular functions, they are listed in the org.apache.spark.sql.functions Scaladoc. Table 3-8 lists some common and useful aggregates. For our example results in these tables we will consider a DataFrame with the schema of name field (as a string) and age (as an integer), both nullable with values ({"ikea", null}, {"tube", 6}, {"real", 30}). Example 3-26 shows how to compute both the min and mean for the pandaSize column on our running panda example.

Tip

Computing multiple aggregates with Spark SQL can be much simpler than doing the same tasks with the RDD API.

Example 3-26. Example aggregates using the agg API
  def minMeanSizePerZip(pandas: DataFrame): DataFrame = {
    // Compute the min and mean
    pandas.groupBy(pandas("zip")).agg(
      min(pandas("pandaSize")), mean(pandas("pandaSize")))
  }
Table 3-8. Spark SQL aggregate functions for use with agg API
Function name Purpose Storage requirement Input types Example usage Example result

approxCount​Distinct

Count approximate distinct values in column a

Configurable through rsd (which controls error rate)

All

df.agg(approxCountDistinct​(df("age"), 0.001))

2

avg

Average

Constant

Numeric

df.agg(avg(df("age")))

18

count

Count number of items (excluding nulls). Special case of “*” counts number of rows

Constant

All

df.agg(count(df("age")))

2

countDistinct

Count distinct values in column

O(distinct elems)

All

df.agg(countDistinct​(df("age")))

2

first

Return the first element b

Constant

All

df.agg(first(df("age")))

6

last

Return the last element

Constant

All

df.agg(last(df("age")))

30

stddev

Sample standard deviation c

Constant

Numeric

df.agg(stddev(df("age")))

16.97…

stddev_pop

Population standard deviation c

Constant

Numeric

df.agg(stddev_pop(df("age")))

12.0

sum

Sum of the values

Constant

Numeric

df.agg(sum(df("age")))

36

sumDistinct

Sum of the distinct values

O(distinct elems)

Numeric

df.agg(sumDistinct​(df("age")))

36

min

Select the minimum value

Constant

Sortable data

df.agg(min(df("age")))

5

max

Select the maximum value

Constant

Sortable data

df.agg(max(df("age")))

30

mean

Select the mean value

Constant

Numeric

df.agg(mean(df("age")))

18

a Implemented with HyperLogLog: https://en.wikipedia.org/wiki/HyperLogLog.

b This was commonly used in early versions of Spark SQL where the grouping column was not preserved.

c Added in Spark 1.6.

Tip

In addition to using aggregates on groupBy, you can run the same aggregations on multidimensional cubes with cube and rollups with rollup.

If the built-in aggregation functions don’t meet your needs, you can extend Spark SQL using UDFs as discussed in “Extending with User-Defined Functions and Aggregate Functions (UDFs, UDAFs)”, although things can be more complicated for aggregate functions.

Windowing

Spark SQL 1.4.0 introduced windowing functions to allow us to more easily work with ranges or windows of rows. When creating a window you specify what columns the window is over, the order of the rows within each partition/group, and the size of the window (e.g., K rows before and J rows after OR range between values). If it helps to think of this visually, Figure 3-2 shows a sample window and its results. Using this specification each input row is related to some set of rows, called a frame, that is used to compute the resulting aggregate. Window functions can be very useful for things like computing average speed with noisy data, relative sales, and more. A window for pandas by age is shown in Example 3-27.

Example 3-27. Define a window on the +/-10 closest (by age) pandas in the same zip code
    val windowSpec = Window
      .orderBy(pandas("age"))
      .partitionBy(pandas("zip"))
      .rowsBetween(start = -10, end = 10) // can use rangeBetween for range instead
Spark SQL windowing
Figure 3-2. Spark SQL windowing

Once you’ve defined a window specification you can compute a function over it, as shown in Example 3-28. Spark’s existing aggregate functions, covered in “Aggregates and groupBy”, can be computed on an aggregation over the window. Window operations are very useful for things like Kalman filtering or many types of relative analysis.

Example 3-28. Compute difference from the average using the window of +/-10 closest (by age) pandas in the same zip code
    val pandaRelativeSizeCol = pandas("pandaSize") -
      avg(pandas("pandaSize")).over(windowSpec)

    pandas.select(pandas("name"), pandas("zip"), pandas("pandaSize"), pandas("age"),
      pandaRelativeSizeCol.as("panda_relative_size"))
Warning

As of this writing, windowing functions require Hive support to be enabled or using HiveContext.

Sorting

Sorting supports multiple columns in ascending or descending order, with ascending as the default. These sort orders can be intermixed, as shown in Example 3-29. Spark SQL has some extra benefits for sorting as some serialized data can be compared without deserialization.

Example 3-29. Sort by panda age and size in opposite orders
    pandas.orderBy(pandas("pandaSize").asc, pandas("age").desc)

When limiting results, sorting is often used to only bring back the top or bottom K results. When limiting you specify the number of rows with limit(numRows) to restrict the number of rows in the DataFrame. Limits are also sometimes used for debugging without sorting to bring back a small result. If, instead of limiting the number of rows based on a sort order, you want to sample your data, “Sampling” covers techniques for Spark SQL sampling as well.

Multi-DataFrame Transformations

Beyond single DataFrame transformations you can perform operations that depend on multiple DataFrames. The ones that first pop into our heads are most likely the different types of joins, which are covered in Chapter 4, but beyond that you can also perform a number of set-like operations between DataFrames.

Set-like operations

The DataFrame set-like operations allow us to perform many operations that are most commonly thought of as set operations. These operations behave a bit differently than traditional set operations since we don’t have the restriction of unique elements. While you are likely already familiar with the results of set-like operations from regular Spark and Learning Spark, it’s important to review the cost of these operations in Table 3-9.

Table 3-9. Set operations
Operation name Cost

unionAll

Low

intersect

Expensive

except

Expensive

distinct

Expensive

Plain Old SQL Queries and Interacting with Hive Data

Sometimes, it’s better to use regular SQL queries instead of building up our operations on DataFrames. If you are connected to a Hive Metastore we can directly write SQL queries against the Hive tables and get the results as a DataFrame. If you have a DataFrame you want to write SQL queries against, you can register it as a temporary table, as shown in Example 3-30 (or save it as a managed table if you intend to reuse it between jobs). Datasets can also be converted back to DataFrames and registered for querying against.

Example 3-30. Registering/saving tables
  def registerTable(df: DataFrame): Unit = {
    df.registerTempTable("pandas")
    df.write.saveAsTable("perm_pandas")
  }

Querying tables is the same, regardless of whether it is a temporary table, existing Hive table, or newly saved Spark table, and is illustrated in Example 3-31.

Example 3-31. Querying a table (permanent or temporary)
  def querySQL(): DataFrame = {
    sqlContext.sql("SELECT * FROM pandas WHERE size > 0")
  }

In addition to registering tables you can also write queries directly against a specific file path, as shown in Example 3-32.

Example 3-32. Querying a raw file
  def queryRawFile(): DataFrame = {
    sqlContext.sql("SELECT * FROM parquet.`path_to_parquet_file`")
  }

Data Representation in DataFrames and Datasets

DataFrames are more than RDDs of Row objects; DataFrames and Datasets have a specialized representation and columnar cache format. The specialized representation is not only more space efficient, but also can be much faster to encode than even Kryo serialization. To be clear, like RDDs, DataFrames and Datasets are generally lazily evaluated and build up a lineage of their dependencies (except in DataFrames this is called a logical plan and contains more information).

Tungsten

Tungsten is a new Spark SQL component that provides more efficient Spark operations by working directly at the byte level. Looking back on Figure 3-1, we can take a closer look at the space differences between the RDDs and DataFrames when cached in Figure 3-3. Tungsten includes specialized in-memory data structures tuned for the types of operations required by Spark, improved code generation, and a specialized wire protocol.

RDD versus Dataframe storage space for same data
Figure 3-3. RDD versus DataFrame storage space for same data
Tip

For those coming from Hadoop, you can think of Tungsten data types as being WritableComparable types on steroids.

Tungsten’s representation is substantially smaller than objects serialized using Java or even Kryo serializers. As Tungsten does not depend on Java objects, both on-heap and off-heap allocations are supported. Not only is the format more compact, but serialization times can be substantially faster than with native serialization.

Tip

Since Tungsten no longer depends on working with Java objects, you can use either on-heap (in the JVM) or off-heap storage. If you use off-heap storage, it is important to leave enough room in your containers for the off-heap allocations, which you can get an approximate idea for from the web UI.

Tungsten’s data structures are also created closely in mind with the kind of processing for which they are used. The classic example of this is with sorting, a common and expensive operation. The on-wire representation is implemented so that sorting can be done without having to deserialize the data again.

Tip

In the future Tungsten may make it more feasible to use certain non-JVM libraries. For many simple operations the cost of using BLAS, or similar linear algebra packages, from the JVM is dominated by the cost of copying the data off-heap.

By avoiding the memory and GC overhead of regular Java objects, Tungsten is able to process larger datasets than the same handwritten aggregations. Tungsten became the default in Spark 1.5 and can be enabled in earlier versions by setting spark.sql.tungsten.enabled to true (or disabled in later versions by setting this to false). Even without Tungsten, Spark SQL uses a columnar storage format with Kryo serialization to minimize storage cost.

Data Loading and Saving Functions

Spark SQL has a different way of loading and saving data than core Spark. To be able to push down certaintypes of operations to the storage layer, Spark SQL has its own Data Source API. Data sources are able to specify and control which type of operations should be pushed down to the data source. As developers, you don’t need to worry too much about the internal activity going on here, unless the data sources you are looking for are not supported.

Tip

Data loading in Spark SQL is not quite as lazy as in regular Spark, but is still generally lazy. You can verify this by quickly trying to load from a data source that doesn’t exist.

DataFrameWriter and DataFrameReader

The DataFrameWriter and the DataFrameReader cover writing and reading from external data sources. The DataFrameWriter is accessed by calling write on a DataFrame or Dataset. The DataFrameReader can be accessed through read on a SQLContext.

Tip

Spark SQL updated the load/save API in Spark 1.4, so you may see code still using the old-style API without the DataFrame reader or writer classes, but under the hood it is implemented as a wrapper around the new API.

Formats

When reading or writing you specify the format by calling format(formatName) on the DataFrameWriter/DataFrameReader. Format-specific parameters, such as number of records to be sampled for JSON, are specified by either providing a map of options with options or setting option-by-option with option on the reader/writer.

Tip

The first-party formats JSON, JDBC, ORC, and Parquet methods are directly defined on the reader/writers taking the path or connection info. These methods are for convenience only and are wrappers around the more general methods we illustrate in this chapter.

JSON

Loading and writing JSON is supported directly in Spark SQL, and despite the lack of schema information in JSON, Spark SQL is able to infer a schema for us by sampling the records. Loading JSON data is more expensive than loading many data sources, since Spark needs to read some of the records to determine the schema information. If the schema between records varies widely (or the number of records is very small), you can increase the percentage of records read to determine the schema by setting samplingRatio to a higher value, as in Example 3-33 where we set the sample ratio to 100%.

Example 3-33. Load JSON data, using all (100%) of records to determine the schema
    val df2 = session.read.format("json")
      .option("samplingRatio", "1.0").load(path)
Tip

Spark’s schema inference can be a compelling reason to use Spark for processing JSON data, even if the data size could be handled on a single node.

Since our input may contain some invalid JSON records we may wish to filter out, we can also take in an RDD of strings. This allows us to load the input as a standard text file, filter out our invalid records, and then load the data into JSON. This is done by using the built-in json function on the DataFrameReader, which takes RDDs or paths and is shown in Example 3-34. Methods for converting RDDs of regular objects are covered in “RDDs”.

Example 3-34. jsonRDD load
    val rdd: RDD[String] = input.filter(_.contains("panda"))
    val df = session.read.json(rdd)

JDBC

The JDBC data source represents a natural Spark SQL data source, one that supports many of the same operations. Since different database vendors have slightly different JDBC implementations, you need to add the JAR for your JDBC data sources. Since SQL field types vary as well, Spark uses JdbcDialects with built-in dialects for DB2, Derby, MsSQL, MySQL, Oracle, and Postgres.5

While Spark supports many different JDBC sources, it does not ship with the JARs required to talk to all of these databases. If you are submitting your Spark job with spark-submit you can download the required JARs to the host you are launching and include them by specifying --jars or supply the Maven coordinates to --packages. Since the Spark Shell is also launched this way, the same syntax works and you can use it to include the MySQL JDBC JAR in Example 3-35.

Example 3-35. Include MySQL JDBC JAR
spark-submit --jars ./resources/mysql-connector-java-5.1.38.jar $ASSEMBLY_JAR $CLASS
Warning

In earlier versions of Spark --jars does not include the JAR in the driver’s class path. If this is the case for your cluster you must also specify the same JAR to --driver-class-path.

JdbcDialects allow Spark to correctly map the JDBC types to the corresponding Spark SQL types. If there isn’t a JdbcDialect for your database vendor, the default dialect will be used, which will likely work for many of the types. The dialect is automatically chosen based on the JDBC URL used.

Tip

If you find yourself needing to customize the JdbcDialect for your database vendor, you can look for a package or spark-packages or extend the JdbcDialect class and register your own dialect.

As with the other built-in data sources, there exists a convenience wrapper for specifying the properties required to load JDBC data, illustrated in Example 3-36. The convenience wrapper JDBC accepts the URL, table, and a java.util.Properties object for connection properties (such as authentication information). The properties object is merged with the properties that are set on the reader/writer itself. While the properties object is required, an empty properties object can be provided and properties instead specified on the reader/writer.

Example 3-36. Create a DataFrame from a JDBC data source
    session.read.jdbc("jdbc:dialect:serverName;user=user;password=pass",
      "table", new Properties)

    session.read.format("jdbc")
      .option("url", "jdbc:dialect:serverName")
      .option("dbtable", "table").load()

The API for saving a DataFrame is very similar to the API used for loading. The save() function needs no path since the information is already specified, as illustrated in Example 3-37, just as with loading.

Example 3-37. Write a DataFrame to a JDBC data source
    df.write.jdbc("jdbc:dialect:serverName;user=user;password=pass",
      "table", new Properties)

    df.write.format("jdbc")
      .option("url", "jdbc:dialect:serverName")
      .option("user", "user")
      .option("password", "pass")
      .option("dbtable", "table").save()

In addition to reading and writing JDBC data sources, Spark SQL can also run its own JDBC server (covered in “JDBC/ODBC Server”).

Parquet

Apache Parquet files are a common format directly supported in Spark SQL, and they are incredibly space-efficient and popular. Apache Parquet’s popularity comes from a number of features, including the ability to easily split across multiple files, compression, nested types, and many others discussed in the Parquet documentation. Since Parquet is such a popular format, there are some additional options available in Spark for the reading and writing of Parquet files. These options are listed in Table 3-10. Unlike third-party data sources, these options are mostly configured on the SQLContext, although some can be configured on either the SQLContext or DataFrameReader/Writer.

Table 3-10. Parquet data source options
SQLConf DataFrameReader/Writer option Default Purpose

spark.sql.parquet​.mergeSchema

mergeSchema

False

Control if schema should be merged between partitions when reading. Can be expensive, so disabled by default in 1.5.0.

spark.sql.parquet​.binary​AsString

N/A

False

Treat binary data as strings. Old versions of Spark wrote strings as binary data.

spark.sql.parquet​.cacheMetadata

N/A

True

Cache Parquet metadata, normally safe unless underlying data is being modified by another process.

spark.sql.parquet​.compression​.codec

N/A

Gzip

Specify the compression codec for use with Parquet data. Valid options are uncompressed, snappy, gzip, or lzo.

spark.sql.parquet​.filter​Pushdown

N/A

True

Push down filters to Parquet (when possible).a

spark.sql.parquet​.writeLegacyFormat

N/A

False

Write in Parquet metadata in the legacy format.

spark.sql.parquet​.output​.committer.class

N/A

org.apache​.parquet​.hadoop.Parquet​Output​Committer

Output committer used by Parquet. If writing to S3 you may wish to try org.apache.spark.sql.parquet.DirectParquetOutputCommitter.

a Pushdown means evaluate at the storage, so with Parquet this can often mean skipping reading unnecessary rows or files.

Reading Parquet from an old version of Spark requires some special options, as shown in Example 3-38.

Example 3-38. Read Parquet file written by an old version of Spark
  def loadParquet(path: String): DataFrame = {
    // Configure Spark to read binary data as string,
    // note: must be configured on session.
    session.conf.set("spark.sql.parquet.binaryAsString", "true")

    // Load parquet data using merge schema (configured through option)
    session.read
      .option("mergeSchema", "true")
      .format("parquet")
      .load(path)
  }

Writing parquet with the default options is quite simple, as shown in Example 3-39.

Example 3-39. Write Parquet file with default options
  def writeParquet(df: DataFrame, path: String) = {
    df.write.format("parquet").save(path)
  }

Hive tables

Interacting with Hive tables adds another option beyond the other formats. As covered in “Plain Old SQL Queries and Interacting with Hive Data”, one option for bringing in data from a Hive table is writing a SQL query against it and having the result as a DataFrame. The DataFrame’s reader and writer interfaces can also be used with Hive tables, as with the rest of the data sources, as illustrated in Example 3-40.

Example 3-40. Load a Hive table
  def loadHiveTable(): DataFrame = {
    session.read.table("pandas")
  }
Warning

When loading a Hive table Spark SQL will convert the metadata and cache the result. If the underlying metadata has changed you can use sqlContext.refreshTable("tablename") to update the metadata, or the caching can be disabled by setting spark.sql.parquet.cacheMetadata to false.

Saving a managed table is a bit different, and is illustrated in Example 3-41.

Example 3-41. Write managed table
  def saveManagedTable(df: DataFrame): Unit = {
    df.write.saveAsTable("pandas")
  }
Warning

Unless specific conditions are met, the result saved to a Hive managed table will be saved in a Spark-specific format that other tools may not be able to understand.

RDDs

Spark SQL DataFrames can easily be converted to RDDs of Row objects, and can also be created from RDDs of Row objects as well as JavaBeans, Scala case classes, and tuples. For RDDs of strings in JSON format, you can use the methods discussed in “JSON”. Datasets of type T can also easily be converted to RDDs of type T, which can provide a useful bridge for DataFrames to RDDs of concrete case classes instead of Row objects. RDDs are a special-case data source, since when going to/from RDDs, the data remains inside of Spark without writing out to or reading from an external system.

Tip

Converting a DataFrame to an RDD is a transformation (not an action); however, converting an RDD to a DataFrame or Dataset may involve computing (or sampling some of) the input RDD.

Warning

Creating a DataFrame from an RDD is not free in the general case. The data must be converted into Spark SQL’s internal format.

When you create a DataFrame from an RDD, Spark SQL needs to add schema information. If you are creating the DataFrame from an RDD of case classes or plain old Java objects (POJOs), Spark SQL is able to use reflection to automatically determine the schema, as shown in Example 3-42. You can also manually specify the schema for your data using the structure discussed in “Basics of Schemas”. This can be especially useful if some of your fields are not nullable. You must specify the schema yourself if Spark SQL is unable to determine the schema through reflection, such as an RDD of Row objects (perhaps from calling .rdd on a DataFrame to use a functional transformation, as shown in Example 3-42).

Example 3-42. Creating DataFrames from RDDs
  def createFromCaseClassRDD(input: RDD[PandaPlace]) = {
    // Create DataFrame explicitly using session and schema inference
    val df1 = session.createDataFrame(input)

    // Create DataFrame using session implicits and schema inference
    val df2 = input.toDF()

    // Create a Row RDD from our RDD of case classes
    val rowRDD = input.map(pm => Row(pm.name,
      pm.pandas.map(pi => Row(pi.id, pi.zip, pi.happy, pi.attributes))))

    val pandasType = ArrayType(StructType(List(
      StructField("id", LongType, true),
      StructField("zip", StringType, true),
      StructField("happy", BooleanType, true),
      StructField("attributes", ArrayType(FloatType), true))))

    // Create DataFrame explicitly with specified schema
    val schema = StructType(List(StructField("name", StringType, true),
      StructField("pandas", pandasType)))

    val df3 = session.createDataFrame(rowRDD, schema)
  }
Warning

Case classes or JavaBeans defined inside another class can sometimes cause problems. If your RDD conversion is failing, make sure the case class being used isn’t defined inside another class.

Converting a DataFrame to an RDD is incredibly simple; however, you get an RDD of Row objects, as shown in Example 3-43. Since a row can contain anything, you need to specify the type (or cast the result) as you fetch the values for each column in the row. With Datasets you can directly get back an RDD templated on the same type, which can make the conversion back to a useful RDD much simpler.

Warning

While Scala has many implicit conversions for different numeric types, these do not generally apply in Spark SQL; instead, we use explicit casting.

Example 3-43. Convert a DataFrame
  def toRDD(input: DataFrame): RDD[RawPanda] = {
    val rdd: RDD[Row] = input.rdd
    rdd.map(row => RawPanda(row.getAs[Long](0), row.getAs[String](1),
      row.getAs[String](2), row.getAs[Boolean](3), row.getAs[Array[Double]](4)))
  }
Tip

If you know that the schema of your DataFrame matches that of another, you can use the existing schema when constructing your new DataFrame. One common place where this occurs is when an input DataFrame has been converted to an RDD for functional filtering and then back.

Local collections

Much like with RDDs, you can also create DataFrames from local collections and bring them back as local collections, as illustrated in Example 3-44. The same memory requirements apply; namely, the entire contents of the DataFrame will be in-memory in the driver program. As such, distributing local collections is normally limited to unit tests, or joining small datasets with larger distributed datasets.

Example 3-44. Creating from a local collection
  def createFromLocal(input: Seq[PandaPlace]) = {
    session.createDataFrame(input)
  }
Tip

The LocalRelation’s API we used here allows us to specify a schema in the same manner as when we are converting an RDD to a DataFrame.

Warning

In pre-1.6 versions of PySpark, schema inference only looked at the first record.

Collecting data back as a local collection is more common and often done post aggregations or filtering on the data. For example, with ML pipelines collecting the coefficents or (as discussed in our Goldilocks example in Chapter 6) collecting the quantiles to the driver. Example 3-45 shows how to collect a DataFrame back locally. For larger datasets, saving to an external storage system (such as a database or HDFS) is recommended.

Warning

Just as with RDDs, do not collect large DataFrames back to the driver. For Python users, it is important to remember that toPandas() collects the data locally.

Example 3-45. Collecting the result locally
  def collectDF(df: DataFrame) = {
    val result: Array[Row] = df.collect()
    result
  }

Additional formats

As with core Spark, the data formats that ship directly with Spark only begin to scratch the surface of the types of systems with which you can interact. Some vendors publish their own implementations, and many are published on Spark Packages. As of this writing there are over twenty formats listed on the Data Source’s page with the most popular being Avro, Redshift, CSV,6 and a unified wrapper around 6+ databases called deep-spark.

Spark packages can be included in your application in a few different ways. During the exploration phase (e.g., using the shell) you can include them by specifying --packages on the command line, as in Example 3-46. The same approach can be used when submitting your application with spark-submit, but this only includes the package at runtime, not at compile time. For including at compile time you can add the Maven coordinates to your builds, or, if building with sbt, the sbt-spark-package plug-in simplifies package dependencies with spDependencies. Otherwise, manually listing them as in Example 3-47 works quite well.

Tip

Spark CSV is now included as part of Spark 2.0+, so you only need to include this for earlier versions of Spark.

Example 3-46. Starting Spark shell with CSV support
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
Example 3-47. Include spark-csv as an sbt dependency
"com.databricks" % "spark-csv_2.11" % "1.5.0"

Once you’ve included the package with your Spark job you need to specify the format, as you did with the Spark provided ones. The name should be mentioned in the package’s documentation. For spark-csv you would specify a format string of com.databricks.spark.csv. For the built-in CSV format (in Spark 2.0+) you would instead just use csv (or the full name org.apache.spark.sql.csv).

There are a few options if the data format you are looking for isn’t directly supported in either Spark or one of the libraries. Since many formats are available as Hadoop input formats, you can try to load your data as a Hadoop input format and convert the resulting RDD as discussed in “RDDs”. This approach is relatively simple, but means Spark SQL is unable to push down operations to our data store.7

For a deeper integration you can implement your data source using the Data Source API. Depending on which operations you wish to support operator push-down for, in your base relation you will need to implement additional traits from the org.apache.spark.sql.sources package. The details of implementing a new Spark SQL data source are beyond the scope of this book, but if you are interested the Scaladoc for org.apache.spark.sql.sources and spark-csv’s CsvRelation can be good ways to get started.

Save Modes

In core Spark, saving RDDs always requires that the target directory does not exist, which can make appending to existing tables challenging. With Spark SQL, you can specify the desired behavior when writing out to a path that may already have data. The default behavior is SaveMode.ErrorIfExists; matching the behavior of RDDs, Spark will throw an exception if the target already exists. The different save modes and their behaviors are listed in Table 3-11. Example 3-48 illustrates how to configure an alternative save mode.

Table 3-11. Save modes
Save Mode Behavior

ErrorIfExists

Throws an exception if the target already exists. If target doesn’t exist write the data out.

Append

If target already exists, append the data to it. If the data doesn’t exist write the data out.

Overwrite

If the target already exists, delete the target. Write the data out.

Ignore

If the target already exists, silently skip writing out. Otherwise write out the data.

Example 3-48. Specify save mode of append
  def writeAppend(input: DataFrame): Unit = {
    input.write.mode(SaveMode.Append).save("output/")
  }

Partitions (Discovery and Writing)

Partition data is an important part of Spark SQL since it powers one of the key optimizations to allow reading only the required data, discussed more in “Logical and Physical Plans”. If you know how your downstream consumers may access your data (e.g., reading data based on zip code), when you write your data it is beneficial to use that information to partition your output. When reading the data, it’s useful to understand how partition discovery functions, so you can have a better understanding of whether your filter can be pushed down.

Tip

Filter push-down can make a huge difference when working with large datasets by allowing Spark to only access the subset of data required for your computation instead of doing effectively a full table scan.

When reading partitioned data, you point Spark to the root path of your data, and it will automatically discover the different partitions. Not all data types can be used as partition keys; currently only strings and numeric data are the supported types.

If your data is all in a single DataFrame, the DataFrameWriter API makes it easy to specify the partition information while you are writing the data out. The partitionBy function takes a list of columns to partition the output on, as shown in Example 3-49. You can also manually save out separate DataFrames (say if you are writing from different jobs) with individual save calls.

Example 3-49. Save partitioned by zip code
  def writeOutByZip(input: DataFrame): Unit = {
    input.write.partitionBy("zipcode").format("json").save("output/")
  }

In addition to splitting the data by a partition key, it can be useful to make sure the resulting file sizes are reasonable, especially if the results will be used downstream by another Spark job.

Datasets

Datasets are an exciting extension of Spark SQL that provide additional compile-time type checking. Starting in Spark 2.0, DataFrames are now a specialized version of Datasets that operate on generic Row objects and therefore lack the normal compile-time type checking of Datasets. Datasets can be used when your data can be encoded for Spark SQL and you know the type information at compile time. The Dataset API is a strongly typed collection with a mixture of relational (DataFrame) and functional (RDD) transformations. Like DataFrames, Datasets are represented by a logical plan the Catalyst optimizer (see “Query Optimizer”) can work with, and when cached the data is stored in Spark SQL’s internal encoding format.

Warning

The Dataset API is new in Spark 1.6 and will change in future versions. Users of the Dataset API are advised to treat it as a “preview.” Up-to-date documentation on the Dataset API can be found in the Scaladoc.

Interoperability with RDDs, DataFrames, and Local Collections

Datasets can be easily converted to/from DataFrames and RDDs, but in the initial version they do not directly extend either. Converting to/from RDDs involves encoding/decoding the data into a different form. Converting to/from DataFrames is almost “free” in that the underlying data does not need to be changed; only extra compile-time type information is added/removed.

Tip

In Spark 2.0 the DataFrame type has been replaced with a type alias to Dataset[Row].

Warning

The type alias for DataFrame is not visible in Java for Spark 2.0, so updating Java code will require changing from DataFrame to Dataset<Row>.

To convert a DataFrame to a Dataset you can use the as[ElementType] function on the DataFrame to get a Dataset[ElementType] back as shown in Example 3-50. The ElementType must be a case class, or similar such as tuple, consisting of types Spark SQL can represent (see “Basics of Schemas”). To create Datasets from local collections, createDataSet(...) on the SQLContext and the toDS() implicit function are provided on Seqs in the same manner as createDataFrame(...) and toDF(). For converting from RDD to Dataset you can first convert from RDD to DataFrame and then convert it to a Dataset.

Tip

For loading data into a Dataset, unless a special API is provided by your data source, you can first load your data into a DataFrame and then convert it to a Dataset. Since the conversion to the Dataset simply adds information, you do not have the problem of eagerly evaluating, and future filters and similar operations can still be pushed down to the data store.

Example 3-50. Create a Dataset from a DataFrame
  def fromDF(df: DataFrame): Dataset[RawPanda] = {
    df.as[RawPanda]
  }

Converting from a Dataset back to an RDD or DataFrame can be done in similar ways as when converting DataFrames, and both are shown in Example 3-51. The toDF simply copies the logical plan used in the Dataset into a DataFrame—so you don’t need to do any schema inference or conversion as you do when converting from RDDs. Converting a Dataset of type T to an RDD of type T can be done by calling .rdd, which unlike calling toDF, does involve converting the data from the internal SQL format to the regular types.

Example 3-51. Convert Dataset to DataFrame and RDD
  /**
   * Illustrate converting a Dataset to an RDD
   */
  def toRDD(ds: Dataset[RawPanda]): RDD[RawPanda] = {
    ds.rdd
  }

  /**
   * Illustrate converting a Dataset to a DataFrame
   */
  def toDF(ds: Dataset[RawPanda]): DataFrame = {
    ds.toDF()
  }

Compile-Time Strong Typing

One of the reasons to use Datasets over traditional DataFrames is their compile-time strong typing. DataFrames have runtime schema information but lack compile-time information about the schema. This strong typing is especially useful when making libraries, because you can more clearly specify the requirements of your inputs and your return types.

Easier Functional (RDD “like”) Transformations

One of the key advantages of the Dataset API is easier integration with custom Scala and Java code. Datasets expose filter, map, mapPartitions, and flatMap with similar function signatures as RDDs, with the notable requirement that your return ElementType also be understandable by Spark SQL (such as tuple or case class of types discussed in “Basics of Schemas”). Example 3-52 illustrates this using a simple map function.

Example 3-52. Functional query on Dataset
  def funMap(ds: Dataset[RawPanda]): Dataset[Double] = {
    ds.map{rp => rp.attributes.filter(_ > 0).sum}
  }

Beyond functional transformations, such as map and filter, you can also intermix relational and grouped/aggregate operations.

Relational Transformations

Datasets introduce a typed version of select for relational-style transformations. When specifying an expression for this you need to include the type information, as shown in Example 3-53. You can add this information by calling as[ReturnType] on the expression/column.

Example 3-53. Simple relational select on Dataset
  def squishyPandas(ds: Dataset[RawPanda]): Dataset[(Long, Boolean)] = {
    ds.select($"id".as[Long], ($"attributes"(0) > 0.5).as[Boolean])
  }
Warning

Some operations, such as select, have both typed and untyped implementations. If you supply a Column rather than a TypedColumn you will get a DataFrame back instead of a Dataset.

Multi-Dataset Relational Transformations

In addition to single Dataset transformations, there are also transformations for working with multiple Datasets. The standard set operations, namely intersect, union, and subtract, are all available with the same standard caveats as discussed in Table 3-9. Joining Datasets is also supported, but to make the type information easier to work with, the return structure is a bit different than traditional SQL joins.

Grouped Operations on Datasets

Similar to grouped operations on DataFrames (described in “Aggregates and groupBy”), groupBy on Datasets prior to Spark 2.0 returns a GroupedDataset or a KeyValueGroupedDataset when grouped with an arbitrary function, and a RelationalGroupedDataset when grouped with a relational/Dataset DSL expression. You can specify your aggregate functions on all of these, along with a functional mapGroups API. As with the expression in “Relational Transformations”, you need to use typed expressions so the result can also be a Dataset.

Taking our previous example of computing the maximum panda size by zip in Example 3-23, you would rewrite it to be as shown in Example 3-54.

Tip

The convenience functions found on GroupedData (e.g., min, max, etc.) are missing, so all of our aggregate expressions need to be specified through agg.

Example 3-54. Compute the max panda size per zip code typed
  def maxPandaSizePerZip(ds: Dataset[RawPanda]): Dataset[(String, Double)] = {
    ds.map(rp => MiniPandaInfo(rp.zip, rp.attributes(2)))
      .groupByKey(mp => mp.zip).agg(max("size").as[Double])
  }

Beyond applying typed SQL expressions to aggregated columns, you can also easily use arbitrary Scala code with mapGroups on grouped data as shown in Example 3-55. This can save us from having to write custom user-defined aggregate functions (UDAFs) (discussed in “Extending with User-Defined Functions and Aggregate Functions (UDFs, UDAFs)”). While custom UDAFs can be painful to write, they may be able to give better performance than mapGroups and can also be used on DataFrames.

Example 3-55. Compute the max panda size per zip code using map groups
  def maxPandaSizePerZipScala(ds: Dataset[RawPanda]): Dataset[(String, Double)] = {
    ds.groupByKey(rp => rp.zip).mapGroups{ case (g, iter) =>
      (g, iter.map(_.attributes(2)).reduceLeft(Math.max(_, _)))
    }
  }

Extending with User-Defined Functions and Aggregate Functions (UDFs, UDAFs)

User-defined functions and user-defined aggregate functions provide you with ways to extend the DataFrame and SQL APIs with your own custom code while keeping the Catalyst optimizer. The Dataset API (see “Datasets”) is another performant option for much of what you can do with UDFs and UDAFs. This is quite useful for performance, since otherwise you would need to convert the data to an RDD (and potentially back again) to perform arbitrary functions, which is quite expensive. UDFs and UDAFs can also be accessed from inside of regular SQL expressions, making them accessible to analysts or others more comfortable with SQL.

Warning

When using UDFs or UDAFs written in non-JVM languages, such as Python, it is important to note that you lose much of the performance benefit, as the data must still be transferred out of the JVM.

Tip

If most of your work is in Python but you want to access some UDFs without the performance penalty, you can write your UDFs in Scala and register them for use in Python (as done in Sparkling Pandas).8

Writing nonaggregate UDFs for Spark SQL is incredibly simple: you simply write a regular function and register it using sqlContext.udf().register. A simple string length UDF is shown in Example 3-56. If you are registering a Java or Python UDF you also need to specify your return type.

Example 3-56. String length UDF
  def setupUDFs(sqlCtx: SQLContext) = {
    sqlCtx.udf.register("strLen", (s: String) => s.length())
  }
Tip

Even with JVM languages UDFs are generally slower than the equivalent SQL expression would be if it exists. Some early work is being done in SPARK-14083 to parse JVM byte code and generate SQL expressions.

Aggregate functions (or UDAFs) are somewhat trickier to write. Instead of writing a regular Scala function, you extend the UserDefinedAggregateFunction and implement a number of different functions, similar to the functions one might write for aggregateByKey on an RDD, except working with different data structures. While they can be complex to write, UDAFs can be quite performant compared with options like mapGroups on Datasets or even simply written aggregateByKey on RDDs. You can then either use the UDAF directly on columns or add it to the function registry as you did for the nonaggregate UDF.

Example 3-57 is a simple UDAF for computing the average, although you will likely want to use Spark’s built in avg in real life.

Example 3-57. UDAF for computing the average
  def setupUDAFs(sqlCtx: SQLContext) = {
    class Avg extends UserDefinedAggregateFunction {
      // Input type
      def inputSchema: org.apache.spark.sql.types.StructType =
        StructType(StructField("value", DoubleType) :: Nil)

      def bufferSchema: StructType = StructType(
        StructField("count", LongType) ::
        StructField("sum", DoubleType) :: Nil
      )

      // Return type
      def dataType: DataType = DoubleType

      def deterministic: Boolean = true

      def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0.0
      }

      def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
        buffer(0) = buffer.getAs[Long](0) + 1
        buffer(1) = buffer.getAs[Double](1) + input.getAs[Double](0)
      }

      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
        buffer1(1) = buffer1.getAs[Double](1) + buffer2.getAs[Double](1)
      }

      def evaluate(buffer: Row): Any = {
        buffer.getDouble(1) / buffer.getLong(0)
      }
    }
    // Optionally register
    val avg = new Avg
    sqlCtx.udf.register("ourAvg", avg)
  }

This is a little more complicated than our regular UDF, so let’s take a look at what the different parts do. You start by specifying what the input type is, then you specify the schema of the buffer you will use for storing the in-progress work. These schemas are specified in the same way as DataFrame and Dataset schemas, discussed in “Basics of Schemas”.

From there the rest of the functions are implementing the same functions you use when writing aggregateByKey on an RDD, but instead of taking arbitrary Scala objects you work with Row and MutableAggregationBuffer. The final evaluate function takes the Row representing the aggregation data and returns the final result.

UDFs, UDAFs, and Datasets all provide ways to intermix arbitrary code with Spark SQL.

Query Optimizer

Catalyst is the Spark SQL query optimizer, which is used to take the query plan and transform it into an execution plan that Spark can run. Much as our transformations on RDDs build up a DAG, as we apply relational and functional transformations on DataFrames/Datasets, Spark SQL builds up a tree representing our query plan, called a logical plan. Spark is able to apply a number of optimizations on the logical plan and can also choose between multiple physical plans for the same logical plan using a cost-based model.

Logical and Physical Plans

The logical plan you construct through transformations on DataFrames/Datasets (or SQL queries) starts out as an unresolved logical plan. Much like a compiler, the Spark optimizer is multiphased and before any optimizations can be performed, it needs to resolve the references and types of the expressions.

This resolved plan is referred to as the logical plan, and Spark applies a number of simplifications directly on the logical plan, producing an optimized logical plan.

These simplifications can be written using pattern matching on the tree, such as the rule for simplifying additions between two literals. The optimizer is not limited to pattern matching, and rules can also include arbitrary Scala code.

Once the logical plan has been optimized, Spark will produce a physical plan. The physical plan stage has both rule-based and cost-based optimizations to produce the optimal physical plan. One of the most important optimizations at this stage is predicate pushdown to the data source level.

Code Generation

As a final step, Spark may also apply code generation for the components. Code generation is done using Janino to compile Java code. Earlier versions used Scala’s Quasi Quotes,9 but the overhead was too high to enable code generation for small datasets. In some TPCDS queries, code generation can result in >10× improvement in performance.

Warning

In some early versions of Spark for complex queries, code generation can cause failures. If you are on an old version of Spark and run into an unexpected failure, it can be worth disabling codegen by setting spark.sql.codegen or spark.sql.tungsten.enabled to false (depending on version).

Large Query Plans and Iterative Algorithms

While the Catalyst optimizer is quite powerful, one of the cases where it currently runs into challenges is with very large query plans. These query plans tend to be the result of iterative algorithms, like graph algorithms or machine learning algorithms. One simple workaround for this is converting the data to an RDD and back to DataFrame/Dataset at the end of each iteration, as shown in Example 3-58. Although if you’re in Python, be sure to use the underlying Java RDD rather than round-tripping through Python (see Example 7-5 for how to do this). Another, somewhat more heavy option, is to write the data to storage and continue from there.

Example 3-58. Round trip through RDD to cut query plan
    val rdd = df.rdd
    rdd.cache()
    sqlCtx.createDataFrame(rdd, df.schema)
Tip

This issue is being tracked in SPARK-13346 and you can see the workaround used in GraphFrames.

Debugging Spark SQL Queries

While Spark SQL’s query optimizer has access to much more information, we still sometimes need to take a peek under the hood and to make sure it’s working as we expected. Similar to toDebugString on RDDs, we have explain and printSchema functions on DataFrames.

One thing that can make a big difference is figuring out if Spark SQL was able to push down a filter. In early versions of Spark SQL filter pushdown didn’t always happen as expected, so the filter sometimes needed to be reordered to be right next to the data load. In newer versions filter pushdown is more likely to fail due to a misconfiguration of your data source.

JDBC/ODBC Server

Spark SQL provides a JDBC server to allow external tools, such as business intelligence GUIs like Tableau, to work with data accessible in Spark and to share resources. Spark SQL’s JDBC server requires that Spark be built with Hive support.

Tip

Since the server tends to be long lived and runs on a single context, it can also be a good way to share cached tables between multiple users.

Spark SQL’s JDBC server is based on the HiveServer2 from Hive, and most corresponding connectors designed for HiveServer2 can be used directly with Spark SQL’s JDBC server. Simba also offers specific drivers for Spark SQL.

The server can either be started from the command line or started using an existing HiveContext. The command-line start and stop commands are ./sbin/start-thriftserver.sh and ./sbin/stop-thriftserver.sh. When starting from the command line, you can configure the different Spark SQL properties by specifying --hiveconf property=value on the command line. Many of the rest of the command-line parameters match that of spark-submit. The default host and port is localhost:10000 and can be configured with hive.server2.thrift.port and hive.server2.thrift.bind.host.

Tip

When starting the JDBC server using an existing HiveContext, you can simply update the config properties on the context instead of specifying command-line parameters.

Examples 3-59 and 3-60 illustrate two different ways to configure the port used by the thrift server.

Example 3-59. Start JDBC server on a different port
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=9090
Example 3-60. Start JDBC server on a different port in Scala
    hiveContext.setConf("hive.server2.thrift.port", "9090")
    HiveThriftServer2.startWithContext(hiveContext)
Tip

When starting the JDBC server on an existing HiveContext, make sure to shut down the JDBC server when exiting.

Conclusion

The considerations for using DataFrames/Datasets over RDDs are complex and changing with the rapid development of Spark SQL. One of the cases where Spark SQL can be difficult to use is when the number of partitions needed for different parts of your pipeline changes, or if you otherwise wish to control the partitioner. While RDDs lack the Catalyst optimizer and relational style queries, they are able to work with a wider variety of data types and provide more direct control over certain types of operations. DataFrames and Datasets also only work with a restricted subset of data types—but when your data is in one of these supported classes the performance improvements of using the Catalyst optimizer provide a compelling case for accepting those restrictions.

DataFrames can be used when you have primarily relational transformations, which can be extended with UDFs when necessary. Compared to RDDs, DataFrames benefit from the efficient storage format of Spark SQL, the Catalyst optimizer, and the ability to perform certain operations directly on the serialized data. One drawback to working with DataFrames is that they are not strongly typed at compile time, which can lead to errors with incorrect column access and other simple mistakes.

Datasets can be used when you want a mix of functional and relational transformations while benefiting from the optimizations for DataFrames and are, therefore, a great alternative to RDDs in many cases. As with RDDs, Datasets are parameterized on the type of data contained in them, which allows for strong compile-time type checking but requires that you know your data type at compile time (although Row or other generic type can be used). The additional type safety of Datasets can be beneficial even for applications that do not need the specific functionality of DataFrames. One potential drawback is that the Dataset API is continuing to evolve, so updating to future versions of Spark may require code changes.

Pure RDDs work well for data that does not fit into the Catalyst optimizer. RDDs have an extensive and stable functional API, and upgrades to newer versions of Spark are unlikely to require substantial code changes. RDDs also make it easy to control partitioning, which can be very useful for many distributed algorithms. Some types of operations, such as multicolumn aggregates, complex joins, and windowed operations, can be daunting to express with the RDD API. RDDs can work with any Java or Kryo serializable data, although the serialization is more often more expensive and less space efficient than the equivalent in DataFrames/Datasets.

Now that you have a good understanding of Spark SQL, it’s time to continue on to joins, for both RDDs and Spark SQL.

1 UDFs allow us to extend SQL to have additional powers, such as computing the geospatial distance between points.

2 A row at a time allows for narrow transformations with no shuffle.

3 A column literal is a column with a fixed value that doesn’t change between rows (i.e., constant).

4 StringIndexer in the ML pipeline is designed for string index encoding.

5 Some types may not be correctly implemented for all databases.

6 spark-csv is now included as part of Spark 2.0.

7 For example, only reading the required partitions when a filter matches one of our partitioning schemes.

8 As of this writing, the Sparkling Pandas project development is on hold but early releases still contain some interesting examples of using JVM code from Python.

9 Scala Quasi Quotes are part of Scala’s macro system.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset