In this chapter, we will focus on the ease of downloading Spark and walk through three simple steps for you as a developer—whether a data scientist or data engineer—to get started writing your first standalone application.
We will use local mode as one of the preferred ways to easily learn Spark, showing you the quick feedback loop for iteratively performing Spark operations. Using Spark shells is a quick way to prototype, learn or test Spark operations with small datasets before writing a complex Spark application, but for large datasets or real work where you want to reap the benefits of distributed execution, the local mode is not suitable–keep this mode strictly for learning or rapid prototyping.
While the Spark shell only supports Scala, Python and R, you can write a Spark application with any of the supported languages—Scala, Java, Python, and R—along with issuing queries in Spark SQL; we do expect some familiarity with the language of your choice to use Spark.Step 1: Download Apache Spark
First, go to https://spark.apache.org/downloads.html and select from the pull-down menu “Prebuilt For Hadoop 2.7 and later” and click on link 3: “Download Spark” (Fig. 2-1).
Note: At the time of publishing this book, Apache Spark 3.0 was still in preview mode, but you can download the latest Spark 3.0 using the same download method and instructions.
This will download the tarball spark-3.0.0-preview2-bin-hadoop2.7.tgz. While this has all the Hadoop-related binaries you will need to run Spark in the local mode on your laptop, you can select the matching Hadoop version from the pulldown menu if you are going to install it on an existing HDFS or Hadoop installation. How to build from source is beyond the scope of this book, but you can read more about it in the documentation. 1
Since the release of Apache Spark 2.2, Python developers can install PySpark from the PyPI2 repository, if you only care about learning Spark in Python. All else being equal, Python programmers do not have to install all other libraries necessary to run Scala, Java, or R. This also makes the binary smaller.
To install PySpark from PyPi, just run
pip install pyspark!
Note: If you only install pyspark, you will need to install Java 8 or above on your machine and set JAVA_HOME environment variable. See instructions on how to download and install Java.3 Also, if want to run R in an interpretive shell mode, you must install R4, followed by just running
sparkR.
We assume that you are running a version of the Linux or OS X operating system on your laptop or cluster, and all the commands and instructions in this book will be in that flavor. Once you have downloaded the tarball, cd into that directory (Figure 2-2).
➜ spark-3.0.0-preview2-bin-hadoop2.7 ls LICENSE R RELEASE conf examples kubernetes python yarn NOTICE README.md bin data jars licenses sbin
Figure 2-2 Unpacking Spark binaries and list of important files and directories
Let’s briefly summarize some of the names of the files and directories and their intent and purposes. Since Spark 2.x and 3.0, new files and directories have been added, and the content and descriptions have subsequently changed too.
README.md
Contains new detailed instructions on how to use Spark shells, build Spark from source, run standalone Spark examples, peruse links to Spark documentation and configuration guides, and how to contribute to Spark.
bin
This directory, as the name suggests, comprises much of the scripts that you’ll employ to interact with Spark, including Spark shells like spark-sql, pyspark, or spark-shell. We will use it later in this chapter, submit a standalone Spark application using spark-submit, and write a script that builds and pushes docker images when running Spark with Kubernetes support.
sbin
Much of the scripts are administrative in purpose for starting and stopping Spark components in the cluster in its various deployment modes. For deployment modes, see the cheat sheet in Table 1-1 in Chapter 1.
kubernetes
Since the release of Spark, this directory contains Dockerfiles for creating Docker images for your Spark distribution on a Kubernetes cluster. It outlines instructions first on how to build the Spark distribution before building your Docker images.
data
Populated with *.txt files, these files serve as input for Spark’s components: MLlib, Streaming, and GraphX.
examples
For any developer, two imperatives that ease the journey to learn any new platform are: 1) loads of code examples illustrating how-to-do-something, and 2) comprehensive documentation. With Spark, these examples span Java, Python, R, and Scala, and you want to employ these examples when learning Spark. We will allude to some of these examples in this and subsequent chapters.
As mentioned earlier, Spark comes with three widely used interpreters that act like interactive “shells” that enable ad hoc data analysis: pyspark, spark-shell, and sparkR. In many ways, their interactivity imitates shells you are already familiar with if you have used Python, Scala, R, SQL, or UNIX operating system shells such as bash or Bourne shell.
These shells have been augmented to support connecting to the cluster and allow you to load distributed data into Spark workers’ memory. Whether you are dealing with megabytes of data or small data, Spark shells are conducive to learning Spark quickly.
Let’s just dive into them. The best way to learn anything is by doing it.
To start a PySpark, cd to the bin directory and launch shell with typing pyspark. By contrast, if you have installed PySpark from PyPI then just typing pyspark will suffice.
➜ spark-3.0.0-preview2-bin-hadoop2.7 pyspark Python 3.7.3 (default, Mar 27 2019, 09:23:15) [Clang 10.0.1 (clang-1001.0.46.3)] on darwin Type "help", "copyright", "credits" or "license" for more information. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 20/01/10 09:57:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_ version 3.0.0-preview2 /_/ Using Python version 3.7.3 (default, Mar 27 2019 09:23:15) SparkSession available as 'spark'. >>> spark.version '3.0.0-preview2' >>>
Figure 2-3 PySpark interpretive shell launched in local mode on a laptop
To start a similar Spark shell with Scala cd to bin directory and type spark-shell.
➜ spark-3.0.0-preview2-bin-hadoop2.7 spark-shell WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 20/01/10 10:00:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://10.0.1.2:4040 Spark context available as 'sc' (master = local[*], app id = local-1578679214780). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_ version 3.0.0-preview2 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 12.0.1) Type in expressions to have them evaluated. Type :help for more information. scala> spark.version res0: String = 3.0.0-preview2 scala>
Figure 2-4 Spark Scala interpretive shell launched in local mode on alaptop
We downloaded and installed Spark on our local machine or laptop. For the remainder of this chapter, we will use Spark interpretive shells locally. That is, Spark will be running in local mode.
Note: Refer the deployment cheat sheet in Table 1-1 (Chapter 1) for a reminder of which components run where in the local mode.
As noted in the previous chapter, Spark computations are expressed as operations. These operations are then converted into low-level RDD-based bytecode as tasks, which are then distributed to Spark’s workers for execution.
Let’s look at a short example where we read in a text file as a DataFrame, show a sample of strings read, and count the total number of lines in the file. This simple example shows the high-level Structured APIs (which we will cover in the next chapter). The show(10, false) operation on the DataFrame only displays the first 10 lines without truncating; by default the truncate boolean flag is true.
scala> val strings = spark.read.text("README.md") strings: org.apache.spark.sql.DataFrame = [value: string] scala> strings.show(10, false) +--------------------------------------------------------------------------------+ |value | +--------------------------------------------------------------------------------+ |# Apache Spark | | | |Spark is a unified analytics engine for large-scale data processing. It provides| |high-level APIs in Scala, Java, Python, and R, and an optimized engine that | |supports general computation graphs for data analysis. It also supports a | |rich set of higher-level tools including Spark SQL for SQL and DataFrames, | |MLlib for machine learning, GraphX for graph processing, | |and Structured Streaming for stream processing. | | | |<https://spark.apache.org/> | +--------------------------------------------------------------------------------+ only showing top 10 rows scala> strings.count() res2: Long = 109 scala>
Example 2-1 Scala line count
Quite simple. Let’s look at a similar example using Python interpretive shell, pyspark.
➜ spark-3.0.0-preview2-bin-hadoop2.7 pyspark Python 3.7.3 (default, Mar 27 2019, 09:23:15) [Clang 10.0.1 (clang-1001.0.46.3)] on darwin Type "help", "copyright", "credits" or "license" for more information. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 20/01/10 11:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_ version 3.0.0-preview2 /_/ Using Python version 3.7.3 (default, Mar 27 2019 09:23:15) SparkSession available as 'spark'. >>> strings = spark.read.text("README.md") >>> strings.show(10, truncate=False) +--------------------------------------------------------------------------------+ |value | +--------------------------------------------------------------------------------+ |# Apache Spark | | | |Spark is a unified analytics engine for large-scale data processing. It provides| |high-level APIs in Scala, Java, Python, and R, and an optimized engine that | |supports general computation graphs for data analysis. It also supports a | |rich set of higher-level tools including Spark SQL for SQL and DataFrames, | |MLlib for machine learning, GraphX for graph processing, | |and Structured Streaming for stream processing. | | | |<https://spark.apache.org/> | +--------------------------------------------------------------------------------+ only showing top 10 rows >>> strings.count() 109 >>>
Example 2-2 Python line count
To exit any of the Spark shells, press Ctrl-D. As you can see, this rapid interactivity with Spark shells is conducive not only to rapid learning but to rapid prototyping, too.
In both examples above, notice the API syntax and signature parity across both Scala and Python. Throughout Spark’s evolution from 1.x, that has been one of the enduring improvements, among many.
Also, we used the high-level Structured APIs to read a text file into a Spark DataFrame rather than an RDD. Throughout the book, we will focus more on high-level Structured APIs since in Spark 2.x, RDDs are now consigned to low-level APIs.
Note: Every computation expressed in high-level Structured APIs is decomposed into low-level optimized and generated RDDs operations and then converted into Scala bytecode for the Executors’ JVM. This generated RDDs operation code is not accessible to users, nor is it the same as the user facing RDD APIs.
Now that we have downloaded Spark, installed it on our laptop in standalone mode, launched Spark shells, and executed short code examples interactively, let’s take the final step.
To get a glimpse of what happens under the hood with our small sample code, let’s first understand the key concepts of a Spark application, and learn how the code is transformed and executed as tasks across the Spark Executors. In the discussion below, you’ll come across some terms and concepts, so we have summarized the definitions in the table below.5
Term | Meaning |
Application | User program built on Spark using its APIs. It consists of a driver program and executors on the cluster |
SparkSession | SparkSession object provides a point of entry to interact with underlying Spark functionality and allows programming Spark with its APIs. In an interactive Spark shell, the Spark driver instantiates a SparkSession for you, while in a Spark application, you create a SparkSession object. |
Job | A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save, collect) |
Stage | Each job gets divided into smaller sets of tasks called stages that depend on each other. |
Task | A single unit of work or execution that will be sent to a Spark Executor. |
Table 2-1 Definitions for some terms and concepts
At the core (and part) of every Spark application is a Spark driver program, which creates a SparkSession object for you. In the above case for Spark shells, the driver is part of shell and the SparkSession object (accessible via variables spark) is created for you, as shown in Figure 2-4.
Because you launched the Spark shell locally on your laptop, all the operations from our code sample ran locally, in one single JVM. But you can just as easily launch Spark Shell to analyze data in parallel on a cluster as in local mode. The spark-shell --help or pyspark --help will show how to connect to the Spark Cluster manager. Figure 2-5 shows how Spark executes on a cluster after it has interacted with the Cluster Manager.
Once we have a SparkSession, we can program Spark using the APIs to perform Spark operations6.
The Driver converts your Spark application into a single or multiple batches of Spark jobs during interactive sessions with Spark shells (Fig. 2-6). The driver transforms each job into a Directed Acyclic Graph (DAG). It’s, in essence, Spark’s execution plan, where each node within a DAG could be a single or multiple Spark stages.
As part of the DAG nodes, stages are created based on what operations can be performed serially or in parallel. Not all Spark operations can happen in a single stage, hence they could be divided into multiple stages. Often stages are delineated on the operator’s computation boundaries, where they dictate data transfer among Spark Executors.
Each stage is comprised of Spark tasks (a unit of execution) which are then federated across each Spark Executor; each task maps to a single core and works a single partition of data. As such, given an Executor with 16 cores can have 16 or more tasks work on 16 or more partitions in parallel, making the execution of Spark’s tasks parallel!
Spark operations on distributed data can be classified into two types: transformations and actions. Transformations, as the name suggests, transform Spark’s DataFrame into a new DataFrame without altering the original data, giving it the property of immutability. Put another way, an operation such as a select() or filter() will not change the original DataFrame; instead it will return the transformed results of the operation as new DataFrame.
All transformations are evaluated lazily. That is, their results are not computed immediately but they are recorded or remembered as a lineage. A recorded lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for efficient execution. Lazy evaluation is Spark’s strategy to delay execution until an action is invoked or data is “touched” – read from or written to disk.
By contrast, an action triggers the lazy evaluation of all the transformations recorded. In the above figure, all transformations T are recorded until the action A is invoked. Each transformation T produces a new DataFrame.
While lazy evaluation allows Spark to optimize your query, by peeking into your chained transformations, lineage and data immutability provide fault-tolerance. Because Spark records each transformation in its lineage and because DataFrames are immutable between transformations, it can reproduce its original state by simply replaying the recorded lineage, giving it resiliency upon failures.
Some examples of transformations and actions are:
Transformations | Actions |
orderBy | show |
groupBy | take |
filter | count |
select | collect |
join | save |
Table 2-2 Transformations and Actions as Spark operations
Together with actions, transformations contribute to a Spark query plan (we will cover query plans in next chapter), but nothing is executed until an action is invoked. For example, the below, Example 2-1, has two transformations (read() and filter()) and one action (count()) as follows:
Python Example 2-1
>>> strings = spark.read.text("README.md") >>> filtered = strings.filter(strings.value.contains("Spark")) >>> filtered.count() 20
By contrast, an action triggers the execution of all transformations recorded as part of the query execution plan (see chapter 3 for query execution plan). In this example above nothing happens until filtered.count() is executed in the shell.
As noted above transformations are operations that Spark evaluates lazily. A huge advantage of the lazy evaluation scheme is that Spark can inspect your computational query and ascertain how it can optimize it. This optimization can be done by either joining or pipelining some operations and assigning them to a stage, or breaking them into stages by determining which operations require a shuffle or exchange of data across clusters.
Transformations can be classified as those that have narrow dependencies or wide dependencies. Any transformation where its single output partition can be computed from its single input partition is a narrow transformation. For example, in our above
code snippet, filter() and contains() represent a narrow transformation because they can operate on a single partition and produce resulting output partition without any exchange of data.
However, groupBy() or orderBy() as shown in Figure. 2-11, instructs Spark to perform a wide transformation where data from other partitions are read, combined, and written to disk. Since each partition will have its own count of the word that contains the Spark in its row of data, a count (groupBy) will force a shuffle of data from each of the Executor’s partitions across the cluster. In this transformation, count requires output from other partitions to compute the final aggregation.
Spark includes a graphical user interface to inspect or monitor Spark applications in their various stages of decomposition. The driver, depending on how Spark is deployed, launches a web UI running by default on port 4040. You can view metrics and details such as:7
a list of scheduler stages and tasks;
a summary of RDD sizes and memory usage;
about environmental information;
information about the running executors; and
all Spark SQL Queries
In local mode, you can access this interface at http://<localhost>:4040 in a web browser.
Note: When you launch spark-shell, part of the output shows the localhost URL to access at port 4040.
So let’s inspect how our short Python Example 2-1 above from the Spark shell translates into jobs, stages, and tasks and view what our DAG looks like. The driver created a single job and a single stage.
If we observe Figure 2-11, we notice that there is no exchange required since there is only a single stage.
Stage 0 is comprised of three tasks. These tasks will be executed in parallel.
In these figures above, notice how each stage has its individual operations shown as part of darker blue boxes.
We will cover in detail how to read these stages, tasks, shuffles, and partitioning in the Spark UI in chapter 8. For now, just note that Spark UI provides a microscopic lens into Spark’s internal workings as a tool for debugging and inspecting.
Aside from using your local machine to run Spark in local mode, you can try any of the examples in this chapter, or any other chapter, on the Databricks Community Edition for free (Figure. 2-1). As a learning tool for Apache Spark, the Community Edition has many tutorials and examples worthy of note. As well as writing your own notebooks in Python, R, Scala or SQL, you can also import other notebooks, including Jupyter notebooks.
Note: Databricks is the company that offers managed Apache Spark in the cloud. The Community Edition is a free version to learn Spark.
To get an account, go to https://databricks.com/try, and follow the instructions to try the Community Edition for free.
For your learning and exploring, the Spark distribution comes with a set of Spark applications for each of Spark’s components. You are welcome to peruse the examples directory in your installation location. For simplicity, we will use the hello world of distributed computing called wordcount, which we are sure you have never heard of!
From your local machine in the installation directory, you can run one of the several Java or Scala sample programs; use bin/run-example <class> [params] to run one of many examples provided.
./bin/run-example JavaWordCount README.md
This will spew out INFO messages on your console along with the list of each word in the README.md with its count.
In the previous example, we counted words in a file. If the file were huge, it would be distributed across a cluster partitioned into small chunks of data, and our Spark program would distribute the task of counting each word in each partition and return us the final count. But that example now has become a cliche.
Let’s solve another similar problem but with a larger data set and using more of Spark’s distribution functionality and Structured DataFrame API. We will cover the APIs used in this program in later chapters, but for now bear with us.
Among the authors of this book, we have a data scientist, who loves to bake cookies with M&Ms, and she rewards her students in states where she frequently teaches machine learning and data science courses. But she’s data driven, obviously, and wants to ensure that she gets the right color of M&Ms in the cookies for students in the respective states.
Let’s write a Spark program that reads a file with over 100,000 entries, where each row or line has a <state, mnm_color, count>, computes, and aggregates the count of each color and state. These aggregated counts tell us the favorite colors of M&Ms favored by students in each state.
Python Example 2-2 Complete list of counting and aggregating M&Ms
# Import the necessary libraries. # Since we are using Python, import the SparkSession and related functions # from the PySpark module. import sys from pyspark.sql import SparkSession from pyspark.sql.functions import count if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: mnmcount <file>", file=sys.stderr) sys.exit(-1) # build an SparkSession using the SparkSession APIs # if one does not exist then create an instance. There # can only be one SparkSession per JVM spark = (SparkSession .builder .appName("PythonMnMCount") .getOrCreate()) # get the M&M data set file name from the command line arguments mnm_file = sys.argv[1] # read the file into a Spark DataFrame using the CSV # format by inferring the schema and informing that the # file contains a header, which are column names for comma # separated fields. mnm_df = (spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load(mnm_file)) # We use the Structured DataFrame high-level APIs. Note # that we don’t use RDDs at all. Because some Spark’s # function return the same object, we can chain function calls. # 1. Select from the DataFrame fields "State", "Color", and "Count" # 2. Since we want them to group each state and its M&M color # count, we use groupBy() # 3. Aggregate count of all colors and groupBy state and color # 4 orderBy descending order count_mnm_df = (mnm_df.select("State", "Color", "Count") .groupBy("State", "Color") .agg(count("Count") .alias("Total")) .orderBy("Total", ascending=False)) # show all the resulting aggregation for all the states, colors # a total count of each color per state # Note show() is an action, which will trigger the above # query to be executed. count_mnm_df.show(n=60, truncate=False) print("Total Rows = %d" % (count_mnm_df.count())) # While the above code aggregated and counted for all # the states, what if we just # a single state, e.g., CA. # 1. Select from all rows in the DataFrame # 2. Filter only CA state # 3. GroupBy() State and Color as we did above # 4. Aggregate its count for each color # 5. OrderBy() by Total in a descending order # find the aggregate count for California by filtering ca_count_mnm_df = (mnm_df.select("State", "Color", "Count") .where(mnm_df.State == 'CA') .groupBy("State", "Color") .agg(count("Count") .alias("Total"))) .orderBy("Total", ascending=False)) # show the resulting aggregation for California # As above, show() is an action that will trigger the execution # of the entire computation. ca_count_mnm_df.show(n=10, truncate=False) # stop the SparkSession spark.stop()
You can enter this code into a Python mnmcount.py file using your favorite editor, download the mnn_dataset.csv (download from the GitHub repo Learning Spark 2nd), and submit as a Spark job using the submit-spark script in the installation’s bin directory. Set your environment variable SPARK_HOME to the root level directory where you installed Spark on your local machine.
To avoid verbose INFO messages printed on the console, set rootCategory=WARN in the conf/ log4j.properties file. Let’s first submit our first Spark job using Python APIs. For an explanation of what the code does, please read the inline comments in the Python Example 2-2.
Note: The code above uses DataFrame API which reads like high-level Domain Specific Language (DSL) queries. We will cover these APIs in the next chapter. But for now note the clarity and simplicity in which you can instruct Spark what to do, not how to do it, unlike in the RDDs API. Cool stuff!
$SPARK_HOME/bin/submit-spark mnmcount.py mnm_dataset.csv jules$ bin/spark-submit mnmcount.py mnm_dataset.csv 19/03/15 13:08:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +-----+------+-----+ |State|Color |Total| +-----+------+-----+ |CA |Yellow|1807 | |WA |Green |1779 | |OR |Orange|1743 | |TX |Green |1737 | |TX |Red |1725 | |CA |Green |1723 | |CO |Yellow|1721 | |CA |Brown |1718 | |CO |Green |1713 | |NV |Orange|1712 | |TX |Yellow|1703 | |NV |Green |1698 | |AZ |Brown |1698 | |CO |Blue |1695 | |WY |Green |1695 | |NM |Red |1690 | |AZ |Orange|1689 | |NM |Yellow|1688 | |NM |Brown |1687 | |UT |Orange|1684 | |NM |Green |1682 | |UT |Red |1680 | |AZ |Green |1676 | |NV |Yellow|1675 | |NV |Blue |1673 | |WA |Red |1671 | |WY |Red |1670 | |WA |Brown |1669 | |NM |Orange|1665 | |WY |Blue |1664 | |WA |Yellow|1663 | |WA |Orange|1658 | |NV |Brown |1657 | |CA |Orange|1657 | |CA |Red |1656 | |CO |Brown |1656 | |UT |Blue |1655 | |AZ |Yellow|1654 | |TX |Orange|1652 | |AZ |Red |1648 | |OR |Blue |1646 | |UT |Yellow|1645 | |OR |Red |1645 | |CO |Orange|1642 | |TX |Brown |1641 | |NM |Blue |1638 | |AZ |Blue |1636 | |OR |Green |1634 | |UT |Brown |1631 | |WY |Yellow|1626 | |WA |Blue |1625 | |CO |Red |1624 | |OR |Brown |1621 | |TX |Blue |1614 | |OR |Yellow|1614 | |NV |Red |1610 | |CA |Blue |1603 | |WY |Orange|1595 | |UT |Green |1591 | |WY |Brown |1532 | +-----+------+-----+ Total Rows = 60 +-----+------+-----+ |State|Color |Total| +-----+------+-----+ |CA |Yellow|1807 | |CA |Green |1723 | |CA |Brown |1718 | |CA |Orange|1657 | |CA |Red |1656 | |CA |Blue |1603 | +-----+------+-----+
The above output is the run from the submitted Spark job showing all the aggregations for each M&M color for each state, followed by those only in CA.
What if you want to use the Scala version of this same Spark program? The APIs are no different; in Spark 2.x, parity is well preserved across languages supported, with minor language syntax differences. Here is the Scala Example 2-3 version of the identical code.
Scala Example 2-3
package main.scala.chapter2 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ /** * Usage: MnMcount <mnm_file_dataset> */ object MnMcount { def main(args: Array[String]) { val spark = SparkSession .builder .appName("MnMCount") .getOrCreate() if (args.length < 1) { print("Usage: MnMcount <mnm_file_dataset>") sys.exit(1) } // get the M&M data set file name val mnm_file = args(0) // read the file into a Spark DataFrame val mnm_df = spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load(mnm_file) // aggregate count of all colors and groupBy state and color // orderBy descending order val count_mnm_df = mnm_df.select("State", "Color", "Count") .groupBy("State", "Color") .agg(count("Count") .alias("Total")) .orderBy(desc("Total")) // show all the resulting aggregation for all the dates and colors count_mnm_df.show(60) println("Total Rows = %d", (count_mnm_df.count())) println() // // find the aggregate count for California by filtering val ca_count_mnm_df = mnm_df.select("*") .where(col("State") === "CA") .groupBy("State", "Color") .agg(count("Count") .alias("Total")) .orderBy(desc("Total")) // show the resulting aggregation for California ca_count_mnm_df.show(10) //stop the SparkSession spark.stop() } }
We will now cover how to build your first Scala Spark program, using Scala Build Tool (sbt). 10 Since Python is an interpreted language and there is no such step as compiling first (though it’s possible to compile your Python code into byte in .pyc,) we will not go into this step here. To use maven to build Java Spark programs, we will refer you to the guide on Apache Spark website11. For brevity in this book, we will cover examples mainly in Python and Scala.
The build.sbt is the specification file, like Makefiles, that directs the Scala compiler to build your Scala related tasks, such as jars, packages, what dependencies to resolve, and where to look for them. In our case, we have a simple sbt file for our M&M code.
Example 2-13 sbt build file.
//name of the package name := "main/scala/chapter2" //version of our package version := "1.0" //version of Scala scalaVersion := "2.12.10" // spark library dependencies libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.0-preview2", "org.apache.spark" %% "spark-sql" % "3.0.0-preview2" )
Assuming that you have JDK and sbt installed, JAVA_HOME and SPARK_HOME set, with a single command, you can build your Spark application:
$ sbt clean package sbt clean package [info] Updated file /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/project/build.properties: set sbt.version to 1.2.8 [info] Loading project definition from /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/project [info] Updating [info] Done updating. ... [info] Compiling 1 Scala source to /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/scala-2.12/classes ... [info] Done compiling. [info] Packaging /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/scala-2.12/main-scala-chapter2_2.12-1.0.jar ... [info] Done packaging. [success] Total time: 6 s, completed Jan 11, 2020, 4:11:02 PM
After a successful build, you can run the above Scala version of M&M count example as follows:
$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv … … 20/01/11 16:00:48 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished 20/01/11 16:00:48 INFO DAGScheduler: Job 4 finished: show at MnMcount.scala:49, took 0.264579 s +-----+------+-----+ |State| Color|Total| +-----+------+-----+ | CA|Yellow| 1807| | CA| Green| 1723| | CA| Brown| 1718| | CA|Orange| 1657| | CA| Red| 1656| | CA| Blue| 1603| +-----+------+-----+
This run will produce the same output as the one above with the Python run. Try it!
So there you have it. Our author data scientist will be more than happy to use this data to drive what type of M&M to use in her cookies for her next class in any one of her destined states.
In this chapter, we have covered three simple steps for you to get started: downloading Apache Spark; using Scala or PySpark interactive shells; and understanding high-level Spark application concepts and terms—all imperative to commence your developer journey to learn Spark. We gave a quick overview of the process by which you can use transformations and actions to write a Spark application. We briefly introduced the Spark UI to examine the relevant jobs, stages, and tasks created.
And finally, through a short example, we showed you how you can use the high-level Structured APIs to instruct Spark what-to-do. Which brings us to our next chapter, Structured APIs in Spark.
1 https://spark.apache.org/downloads.html
2 https://pypi.python.org/pypi/pyspark
3 https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
5 https://spark.apache.org/docs/1.1.0/cluster-overview.html
6 https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
7 http://spark.apache.org/docs/latest/monitoring.html
8
https://qz.com/918008/the-color-distribution-of-mms-as-determined-by-a-phd-in-statistics/
9 https://community.cloud.databricks.com/?o=8599738367597028#notebook/4403727356442261/command/4403727356442270