Chapter 2. Downloading Apache Spark and Getting Started

In this chapter, we will focus on the ease of downloading Spark and walk through three simple steps for you as a developer—whether a data scientist or data engineer—to get started writing your first standalone application.

We will use local mode as one of the preferred ways to easily learn Spark, showing you the quick feedback loop for iteratively performing Spark operations. Using Spark shells is a quick way to prototype, learn or test Spark operations with small datasets before writing a complex Spark application, but for large datasets or real work where you want to reap the benefits of distributed execution, the local mode is not suitable–keep this mode strictly for learning or rapid prototyping.

While the Spark shell only supports Scala, Python and R, you can write a Spark application with any of the supported languages—Scala, Java, Python, and R—along with issuing queries in Spark SQL; we do expect some familiarity with the language of your choice to use Spark.Step 1: Download Apache Spark

First, go to https://spark.apache.org/downloads.html and select from the pull-down menu “Prebuilt For Hadoop 2.7 and later” and click on link 3: “Download Spark” (Fig. 2-1).

Note: At the time of publishing this book, Apache Spark 3.0 was still in preview mode, but you can download the latest Spark 3.0 using the same download method and instructions.

Figure 2-1. Download page on spark.apache.org

This will download the tarball spark-3.0.0-preview2-bin-hadoop2.7.tgz. While this has all the Hadoop-related binaries you will need to run Spark in the local mode on your laptop, you can select the matching Hadoop version from the pulldown menu if you are going to install it on an existing HDFS or Hadoop installation. How to build from source is beyond the scope of this book, but you can read more about it in the documentation. 1

Since the release of Apache Spark 2.2, Python developers can install PySpark from the PyPI2 repository, if you only care about learning Spark in Python. All else being equal, Python programmers do not have to install all other libraries necessary to run Scala, Java, or R. This also makes the binary smaller.

To install PySpark from PyPi, just run

pip install pyspark!

Note: If you only install pyspark, you will need to install Java 8 or above on your machine and set JAVA_HOME environment variable. See instructions on how to download and install Java.3 Also, if want to run R in an interpretive shell mode, you must install R4, followed by just running

sparkR.

Spark’s Directories and Files

We assume that you are running a version of the Linux or OS X operating system on your laptop or cluster, and all the commands and instructions in this book will be in that flavor. Once you have downloaded the tarball, cd into that directory (Figure 2-2).

➜ spark-3.0.0-preview2-bin-hadoop2.7 ls
      LICENSE R RELEASE conf examples kubernetes python yarn
      NOTICE README.md bin data jars licenses sbin

Figure 2-2 Unpacking Spark binaries and list of important files and directories

Let’s briefly summarize some of the names of the files and directories and their intent and purposes. Since Spark 2.x and 3.0, new files and directories have been added, and the content and descriptions have subsequently changed too.

README.md

Contains new detailed instructions on how to use Spark shells, build Spark from source, run standalone Spark examples, peruse links to Spark documentation and configuration guides, and how to contribute to Spark.

bin

This directory, as the name suggests, comprises much of the scripts that you’ll employ to interact with Spark, including Spark shells like spark-sql, pyspark, or spark-shell. We will use it later in this chapter, submit a standalone Spark application using spark-submit, and write a script that builds and pushes docker images when running Spark with Kubernetes support.

sbin

Much of the scripts are administrative in purpose for starting and stopping Spark components in the cluster in its various deployment modes. For deployment modes, see the cheat sheet in Table 1-1 in Chapter 1.

kubernetes

Since the release of Spark, this directory contains Dockerfiles for creating Docker images for your Spark distribution on a Kubernetes cluster. It outlines instructions first on how to build the Spark distribution before building your Docker images.

data

Populated with *.txt files, these files serve as input for Spark’s components: MLlib, Streaming, and GraphX.

examples

For any developer, two imperatives that ease the journey to learn any new platform are: 1) loads of code examples illustrating how-to-do-something, and 2) comprehensive documentation. With Spark, these examples span Java, Python, R, and Scala, and you want to employ these examples when learning Spark. We will allude to some of these examples in this and subsequent chapters.

Step 2: Use Scala Shell or PySpark Shell

As mentioned earlier, Spark comes with three widely used interpreters that act like interactive “shells” that enable ad hoc data analysis: pyspark, spark-shell, and sparkR. In many ways, their interactivity imitates shells you are already familiar with if you have used Python, Scala, R, SQL, or UNIX operating system shells such as bash or Bourne shell.

These shells have been augmented to support connecting to the cluster and allow you to load distributed data into Spark workers’ memory. Whether you are dealing with megabytes of data or small data, Spark shells are conducive to learning Spark quickly.

Let’s just dive into them. The best way to learn anything is by doing it.

To start a PySpark, cd to the bin directory and launch shell with typing pyspark. By contrast, if you have installed PySpark from PyPI then just typing pyspark will suffice.

➜ spark-3.0.0-preview2-bin-hadoop2.7 pyspark
        Python 3.7.3 (default, Mar 27 2019, 09:23:15)
        [Clang 10.0.1 (clang-1001.0.46.3)] on darwin
        Type "help", "copyright", "credits" or "license" for more information.
        WARNING: An illegal reflective access operation has occurred
        WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int)
        WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
        WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
        WARNING: All illegal access operations will be denied in a future release
        20/01/10 09:57:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
        Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
        Setting default log level to "WARN".
        To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
        Welcome to
         ____ __
         / __/__ ___ _____/ /__
         _ / _ / _ `/ __/ '_/
         /__ / .__/\_,_/_/ /_/\_ version 3.0.0-preview2
         /_/
        Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
        SparkSession available as 'spark'.
        >>> spark.version
        '3.0.0-preview2'
        >>>

Figure 2-3 PySpark interpretive shell launched in local mode on a laptop

To start a similar Spark shell with Scala cd to bin directory and type spark-shell.

➜ spark-3.0.0-preview2-bin-hadoop2.7 spark-shell
        WARNING: An illegal reflective access operation has occurred
        WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int)
        WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
        WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
        WARNING: All illegal access operations will be denied in a future release
        20/01/10 10:00:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
        Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
        Setting default log level to "WARN".
        To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
        Spark context Web UI available at http://10.0.1.2:4040
        Spark context available as 'sc' (master = local[*], app id = local-1578679214780).
        Spark session available as 'spark'.
        Welcome to
         ____ __
         / __/__ ___ _____/ /__
         _ / _ / _ `/ __/ '_/
         /___/ .__/\_,_/_/ /_/\_ version 3.0.0-preview2
         /_/
        Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 12.0.1)
        Type in expressions to have them evaluated.
        Type :help for more information.
        scala> spark.version
        res0: String = 3.0.0-preview2
        scala>

Figure 2-4 Spark Scala interpretive shell launched in local mode on alaptop

Using Local Machine

We downloaded and installed Spark on our local machine or laptop. For the remainder of this chapter, we will use Spark interpretive shells locally. That is, Spark will be running in local mode.

Note: Refer the deployment cheat sheet in Table 1-1 (Chapter 1) for a reminder of which components run where in the local mode.

As noted in the previous chapter, Spark computations are expressed as operations. These operations are then converted into low-level RDD-based bytecode as tasks, which are then distributed to Spark’s workers for execution.

Let’s look at a short example where we read in a text file as a DataFrame, show a sample of strings read, and count the total number of lines in the file. This simple example shows the high-level Structured APIs (which we will cover in the next chapter). The show(10, false) operation on the DataFrame only displays the first 10 lines without truncating; by default the truncate boolean flag is true.

scala> val strings = spark.read.text("README.md")
          strings: org.apache.spark.sql.DataFrame = [value: string]
          scala> strings.show(10, false)
          +--------------------------------------------------------------------------------+
          |value |
          +--------------------------------------------------------------------------------+
          |# Apache Spark |
          | |
          |Spark is a unified analytics engine for large-scale data processing. It provides|
          |high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
          |supports general computation graphs for data analysis. It also supports a |
          |rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
          |MLlib for machine learning, GraphX for graph processing, |
          |and Structured Streaming for stream processing. |
          | |
          |<https://spark.apache.org/> |
          +--------------------------------------------------------------------------------+
          only showing top 10 rows
          scala> strings.count()
          res2: Long = 109
          scala>

Example 2-1 Scala line count

Quite simple. Let’s look at a similar example using Python interpretive shell, pyspark.

➜ spark-3.0.0-preview2-bin-hadoop2.7 pyspark
          Python 3.7.3 (default, Mar 27 2019, 09:23:15)
          [Clang 10.0.1 (clang-1001.0.46.3)] on darwin
          Type "help", "copyright", "credits" or "license" for more information.
          WARNING: An illegal reflective access operation has occurred
          WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.0-preview2.jar) to constructor java.nio.DirectByteBuffer(long,int)
          WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
          WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
          WARNING: All illegal access operations will be denied in a future release
          20/01/10 11:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
          Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
          Setting default log level to "WARN".
          To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
          Welcome to
           ____ __
           / __/__ ___ _____/ /__
           _ / _ / _ `/ __/ '_/
           /__ / .__/\_,_/_/ /_/\_ version 3.0.0-preview2
           /_/
          Using Python version 3.7.3 (default, Mar 27 2019 09:23:15)
          SparkSession available as 'spark'.
          >>> strings = spark.read.text("README.md")
          >>> strings.show(10, truncate=False)
          +--------------------------------------------------------------------------------+
          |value |
          +--------------------------------------------------------------------------------+
          |# Apache Spark |
          | |
          |Spark is a unified analytics engine for large-scale data processing. It provides|
          |high-level APIs in Scala, Java, Python, and R, and an optimized engine that |
          |supports general computation graphs for data analysis. It also supports a |
          |rich set of higher-level tools including Spark SQL for SQL and DataFrames, |
          |MLlib for machine learning, GraphX for graph processing, |
          |and Structured Streaming for stream processing. |
          | |
          |<https://spark.apache.org/> |
          +--------------------------------------------------------------------------------+
          only showing top 10 rows
          >>> strings.count()
          109
          >>>

Example 2-2 Python line count

To exit any of the Spark shells, press Ctrl-D. As you can see, this rapid interactivity with Spark shells is conducive not only to rapid learning but to rapid prototyping, too.

In both examples above, notice the API syntax and signature parity across both Scala and Python. Throughout Spark’s evolution from 1.x, that has been one of the enduring improvements, among many.

Also, we used the high-level Structured APIs to read a text file into a Spark DataFrame rather than an RDD. Throughout the book, we will focus more on high-level Structured APIs since in Spark 2.x, RDDs are now consigned to low-level APIs.

Note: Every computation expressed in high-level Structured APIs is decomposed into low-level optimized and generated RDDs operations and then converted into Scala bytecode for the Executors’ JVM. This generated RDDs operation code is not accessible to users, nor is it the same as the user facing RDD APIs.

Step 3: Understand Spark Application Concepts

Now that we have downloaded Spark, installed it on our laptop in standalone mode, launched Spark shells, and executed short code examples interactively, let’s take the final step.

To get a glimpse of what happens under the hood with our small sample code, let’s first understand the key concepts of a Spark application, and learn how the code is transformed and executed as tasks across the Spark Executors. In the discussion below, you’ll come across some terms and concepts, so we have summarized the definitions in the table below.5

Term Meaning
Application User program built on Spark using its APIs. It consists of a driver program and executors on the cluster
SparkSession SparkSession object provides a point of entry to interact with underlying Spark functionality and allows programming Spark with its APIs. In an interactive Spark shell, the Spark driver instantiates a SparkSession for you, while in a Spark application, you create a SparkSession object.
Job A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save, collect)
Stage Each job gets divided into smaller sets of tasks called stages that depend on each other.
Task A single unit of work or execution that will be sent to a Spark Executor.

Table 2-1 Definitions for some terms and concepts

Spark Application and SparkSession

At the core (and part) of every Spark application is a Spark driver program, which creates a SparkSession object for you. In the above case for Spark shells, the driver is part of shell and the SparkSession object (accessible via variables spark) is created for you, as shown in Figure 2-4.

Because you launched the Spark shell locally on your laptop, all the operations from our code sample ran locally, in one single JVM. But you can just as easily launch Spark Shell to analyze data in parallel on a cluster as in local mode. The spark-shell --help or pyspark --help will show how to connect to the Spark Cluster manager. Figure 2-5 shows how Spark executes on a cluster after it has interacted with the Cluster Manager.

Figure 2-2. Spark components communicate through Spark Driver among Spark’s distributed architecture

Once we have a SparkSession, we can program Spark using the APIs to perform Spark operations6.

Spark Jobs

The Driver converts your Spark application into a single or multiple batches of Spark jobs during interactive sessions with Spark shells (Fig. 2-6). The driver transforms each job into a Directed Acyclic Graph (DAG). It’s, in essence, Spark’s execution plan, where each node within a DAG could be a single or multiple Spark stages.

Figure 2-3. Spark driver creating one or multiple Spark jobs

Spark Stages

As part of the DAG nodes, stages are created based on what operations can be performed serially or in parallel. Not all Spark operations can happen in a single stage, hence they could be divided into multiple stages. Often stages are delineated on the operator’s computation boundaries, where they dictate data transfer among Spark Executors.

Figure 2-4. Spark job creating one or more stages

Spark Tasks

Each stage is comprised of Spark tasks (a unit of execution) which are then federated across each Spark Executor; each task maps to a single core and works a single partition of data. As such, given an Executor with 16 cores can have 16 or more tasks work on 16 or more partitions in parallel, making the execution of Spark’s tasks parallel!

Figure 2-5. Spark stage creating one or more tasks to be distributed to Executors

Transformations, Actions, and Lazy Evaluation

Spark operations on distributed data can be classified into two types: transformations and actions. Transformations, as the name suggests, transform Spark’s DataFrame into a new DataFrame without altering the original data, giving it the property of immutability. Put another way, an operation such as a select() or filter() will not change the original DataFrame; instead it will return the transformed results of the operation as new DataFrame.

All transformations are evaluated lazily. That is, their results are not computed immediately but they are recorded or remembered as a lineage. A recorded lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for efficient execution. Lazy evaluation is Spark’s strategy to delay execution until an action is invoked or data is “touched” – read from or written to disk.

Figure 2-6. Transformations, Actions, and Lazy Evaluation as Spark operations

By contrast, an action triggers the lazy evaluation of all the transformations recorded. In the above figure, all transformations T are recorded until the action A is invoked. Each transformation T produces a new DataFrame.

While lazy evaluation allows Spark to optimize your query, by peeking into your chained transformations, lineage and data immutability provide fault-tolerance. Because Spark records each transformation in its lineage and because DataFrames are immutable between transformations, it can reproduce its original state by simply replaying the recorded lineage, giving it resiliency upon failures.

Some examples of transformations and actions are:

Transformations Actions
orderBy show
groupBy take
filter count
select collect
join save

Table 2-2 Transformations and Actions as Spark operations

Together with actions, transformations contribute to a Spark query plan (we will cover query plans in next chapter), but nothing is executed until an action is invoked. For example, the below, Example 2-1, has two transformations (read() and filter()) and one action (count()) as follows:

Python Example 2-1

>>> strings = spark.read.text("README.md") 
          >>> filtered = strings.filter(strings.value.contains("Spark"))
          >>> filtered.count()
          20

By contrast, an action triggers the execution of all transformations recorded as part of the query execution plan (see chapter 3 for query execution plan). In this example above nothing happens until filtered.count() is executed in the shell.

Narrow and Wide Transformations

As noted above transformations are operations that Spark evaluates lazily. A huge advantage of the lazy evaluation scheme is that Spark can inspect your computational query and ascertain how it can optimize it. This optimization can be done by either joining or pipelining some operations and assigning them to a stage, or breaking them into stages by determining which operations require a shuffle or exchange of data across clusters.

Transformations can be classified as those that have narrow dependencies or wide dependencies. Any transformation where its single output partition can be computed from its single input partition is a narrow transformation. For example, in our above

code snippet, filter() and contains() represent a narrow transformation because they can operate on a single partition and produce resulting output partition without any exchange of data.

However, groupBy() or orderBy() as shown in Figure. 2-11, instructs Spark to perform a wide transformation where data from other partitions are read, combined, and written to disk. Since each partition will have its own count of the word that contains the Spark in its row of data, a count (groupBy) will force a shuffle of data from each of the Executor’s partitions across the cluster. In this transformation, count requires output from other partitions to compute the final aggregation.

Figure 2-7. Transformations of Narrow and Wide Dependencies type

Spark UI

Spark includes a graphical user interface to inspect or monitor Spark applications in their various stages of decomposition. The driver, depending on how Spark is deployed, launches a web UI running by default on port 4040. You can view metrics and details such as:7

a list of scheduler stages and tasks;

a summary of RDD sizes and memory usage;

about environmental information;

information about the running executors; and

all Spark SQL Queries

In local mode, you can access this interface at http://<localhost>:4040 in a web browser.

Note: When you launch spark-shell, part of the output shows the localhost URL to access at port 4040.

So let’s inspect how our short Python Example 2-1 above from the Spark shell translates into jobs, stages, and tasks and view what our DAG looks like. The driver created a single job and a single stage.

Figure 2-8. A single job and single stage for our Example 2-1

If we observe Figure 2-11, we notice that there is no exchange required since there is only a single stage.

Stage 0 is comprised of three tasks. These tasks will be executed in parallel.

Figure 2-9. Details of stage 0 and its tasks for our Example 2-1

In these figures above, notice how each stage has its individual operations shown as part of darker blue boxes.

We will cover in detail how to read these stages, tasks, shuffles, and partitioning in the Spark UI in chapter 8. For now, just note that Spark UI provides a microscopic lens into Spark’s internal workings as a tool for debugging and inspecting.

Databricks Community Edition

Aside from using your local machine to run Spark in local mode, you can try any of the examples in this chapter, or any other chapter, on the Databricks Community Edition for free (Figure. 2-1). As a learning tool for Apache Spark, the Community Edition has many tutorials and examples worthy of note. As well as writing your own notebooks in Python, R, Scala or SQL, you can also import other notebooks, including Jupyter notebooks.

Note: Databricks is the company that offers managed Apache Spark in the cloud. The Community Edition is a free version to learn Spark.

Figure 2-10. Databricks Community Edition

To get an account, go to https://databricks.com/try, and follow the instructions to try the Community Edition for free.

First Standalone Application

For your learning and exploring, the Spark distribution comes with a set of Spark applications for each of Spark’s components. You are welcome to peruse the examples directory in your installation location. For simplicity, we will use the hello world of distributed computing called wordcount, which we are sure you have never heard of!

Using Local Machine

From your local machine in the installation directory, you can run one of the several Java or Scala sample programs; use bin/run-example <class> [params] to run one of many examples provided.

./bin/run-example JavaWordCount README.md

This will spew out INFO messages on your console along with the list of each word in the README.md with its count.

Building Standalone Applications in Scala

We will now cover how to build your first Scala Spark program, using Scala Build Tool (sbt). 10 Since Python is an interpreted language and there is no such step as compiling first (though it’s possible to compile your Python code into byte in .pyc,) we will not go into this step here. To use maven to build Java Spark programs, we will refer you to the guide on Apache Spark website11. For brevity in this book, we will cover examples mainly in Python and Scala.

The build.sbt is the specification file, like Makefiles, that directs the Scala compiler to build your Scala related tasks, such as jars, packages, what dependencies to resolve, and where to look for them. In our case, we have a simple sbt file for our M&M code.

Example 2-13 sbt build file.

//name of the package
          name := "main/scala/chapter2"
          //version of our package
          version := "1.0"
          //version of Scala
          scalaVersion := "2.12.10"
          // spark library dependencies
          libraryDependencies ++= Seq(
           "org.apache.spark" %% "spark-core" % "3.0.0-preview2",
           "org.apache.spark" %% "spark-sql" % "3.0.0-preview2"
          )

Assuming that you have JDK and sbt installed, JAVA_HOME and SPARK_HOME set, with a single command, you can build your Spark application:

$ sbt clean package
          sbt clean package
          [info] Updated file /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/project/build.properties: set sbt.version to 1.2.8
          [info] Loading project definition from /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/project
          [info] Updating 
          [info] Done updating.
          ...
          [info] Compiling 1 Scala source to /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/scala-2.12/classes ...
          [info] Done compiling.
          [info] Packaging /Users/julesdamji/gits/LearningSparkV2/chapter2/scala/target/scala-2.12/main-scala-chapter2_2.12-1.0.jar ...
          [info] Done packaging.
          [success] Total time: 6 s, completed Jan 11, 2020, 4:11:02 PM

After a successful build, you can run the above Scala version of M&M count example as follows:

$SPARK_HOME/bin/spark-submit --class main.scala.chapter2.MnMcount jars/main-scala-chapter2_2.12-1.0.jar data/mnm_dataset.csv
          …
          …
          20/01/11 16:00:48 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
          20/01/11 16:00:48 INFO DAGScheduler: Job 4 finished: show at MnMcount.scala:49, took 0.264579 s
          +-----+------+-----+
          |State| Color|Total|
          +-----+------+-----+
          | CA|Yellow| 1807|
          | CA| Green| 1723|
          | CA| Brown| 1718|
          | CA|Orange| 1657|
          | CA| Red| 1656|
          | CA| Blue| 1603|
          +-----+------+-----+

This run will produce the same output as the one above with the Python run. Try it!

So there you have it. Our author data scientist will be more than happy to use this data to drive what type of M&M to use in her cookies for her next class in any one of her destined states.

Summary

In this chapter, we have covered three simple steps for you to get started: downloading Apache Spark; using Scala or PySpark interactive shells; and understanding high-level Spark application concepts and terms—all imperative to commence your developer journey to learn Spark. We gave a quick overview of the process by which you can use transformations and actions to write a Spark application. We briefly introduced the Spark UI to examine the relevant jobs, stages, and tasks created.

And finally, through a short example, we showed you how you can use the high-level Structured APIs to instruct Spark what-to-do. Which brings us to our next chapter, Structured APIs in Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset