Understanding Spark architecture

A parallel execution involves splitting the workload into subtasks that are executed in different threads or on different nodes. Let's see how Spark does this and how it manages execution and communication between the subtasks.

Task scheduling

Spark workload splitting is determined by the number of partitions for Resilient Distributed Dataset (RDD), the basic abstraction in Spark, and the pipeline structure. An RDD represents an immutable, partitioned collection of elements that can be operated on in parallel. While the specifics might depend on the mode in which Spark runs, the following diagram captures the Spark task/resource scheduling:

Task scheduling

Figure 03-2. A generic Spark task scheduling diagram. While not shown explicitly in the figure, Spark Context opens an HTTP UI, usually on port 4040 (the concurrent contexts will open 4041, 4042, and so on), which is present during a task execution. Spark Master UI is usually 8080 (although it is changed to 18080 in CDH) and Worker UI is usually 7078. Each node can run multiple executors, and each executor can run multiple tasks.

Tip

You will find that Spark, as well as Hadoop, has a lot of parameters. Some of them are specified as environment variables (refer to the $SPARK_HOME/conf/spark-env.sh file), and yet some can be given as a command-line parameter. Moreover, some files with pre-defined names can contain parameters that will change the Spark behavior, such as core-site.xml. This might be confusing, and I will cover as much as possible in this and the following chapters. If you are working with Hadoop Distributed File System (HDFS), then the core-site.xml and hdfs-site.xml files will contain the pointer and specifications for the HDFS master. The requirement for picking this file is that it has to be on CLASSPATH Java process, which, again, may be set by either specifying HADOOP_CONF_DIR or SPARK_CLASSPATH environment variables. As is usual with open source, you need to grep the code sometimes to understand how various parameters work, so having a copy of the source tree on your laptop is a good idea.

Each node in the cluster can run one or more executors, and each executor can schedule a sequence of tasks to perform the Spark operations. Spark driver is responsible for scheduling the execution and works with the cluster scheduler, such as Mesos or YARN to schedule the available resources. Spark driver usually runs on the client machine, but in the latest release, it can also run in the cluster under the cluster manager. YARN and Mesos have the ability to dynamically manage the number of executors that run concurrently on each node, provided the resource constraints.

In the Standalone mode, Spark Master does the work of the cluster scheduler—it might be less efficient in allocating resources, but it's better than nothing in the absence of preconfigured Mesos or YARN. Spark standard distribution contains shell scripts to start Spark in Standalone mode in the sbin directory. Spark Master and driver communicate directly with one or several Spark workers that run on individual nodes. Once the master is running, you can start Spark shell with the following command:

$ bin/spark-shell --master spark://<master-address>:7077

Tip

Note that you can always run Spark in local mode, which means that all tasks will be executed in a single JVM, by specifying --master local[2], where 2 is the number of threads that have to be at least 2. In fact, we will be using the local mode very often in this book for running small examples.

Spark shell is an application from the Spark point of view. Once you start a Spark application, you will see it under Running Applications in the Spark Master UI (or in the corresponding cluster manager), which can redirect you to the Spark application HTTP UI at port 4040, where one can see the subtask execution timeline and other important properties such as environment setting, classpath, parameters passed to the JVM, and information on resource usage (refer to Figure 3-3):

Task scheduling

Figure 03-3. Spark Driver UI in Standalone mode with time decomposition

As we saw, with Spark, one can easily switch between local and cluster mode by providing the --master command-line option, setting a MASTER environment variable, or modifying spark-defaults.conf, which should be on the classpath during the execution, or even set explicitly using the setters method on the SparkConf object directly in Scala, which will be covered later:

Cluster Manager

MASTER env variable

Comments

Local (single node, multiple threads)

local[n]

n is the number of threads to use, should be greater than or equal to 2. If you want Spark to communicate with other Hadoop tools such as Hive, you still need to point it to the cluster by either setting the HADOOP_CONF_DIR environment variable or copying the Hadoop *-site.xml configuration files into the conf subdirectory.

Standalone (Daemons running on the nodes)

spark:// master-address>:7077

This has a set of start/stop scripts in the $SPARK_HOME/sbin directory. This also supports the HA mode. More details can be found at https://spark.apache.org/docs/latest/spark-standalone.html.

Mesos

mesos://host:5050 or mesos://zk://host:2181

(multimaster)

Here, you need to set MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so> and SPARK_EXECUTOR_URI=<URL of spark-1.5.0.tar.gz>. The default is fine-grained mode, where each Spark task runs as a separate Mesos task. Alternatively, the user can specify the coarse-grained mode, where the Mesos tasks persists for the duration of the application. The advantage is lower total start-up costs. This can use dynamic allocation (refer to the following URL) in coarse-grained mode. More details can be found at https://spark.apache.org/docs/latest/running-on-mesos.html.

YARN

yarn

Spark driver can run either in the cluster or on the client node, which is managed by the --deploy-mode parameter (cluster or client, shell can only run in the client mode). Set HADOOP_CONF_DIR or YARN_CONF_DIR to point to the YARN config files. Use the --num-executors flag or spark.executor.instances property to set a fixed number of executors (default).

Set spark.dynamicAllocation.enabled to true to dynamically create/kill executors depending on the application demand. More details are available at https://spark.apache.org/docs/latest/running-on-yarn.html.

The most common ports are 8080, the master UI, and 4040, the application UI. Other Spark ports are summarized in the following table:

Standalone ports

   

From

To

Default Port

Purpose

Configuration Setting

Browser

Standalone Master

8080

Web UI

spark.master.ui.port /SPARK_MASTER_WEBUI_PORT

Browser

Standalone worker

8081

Web UI

spark.worker.ui.port /SPARK_WORKER_WEBUI_PORT

Driver / Standalone worker

Standalone Master

7077

Submit job to cluster / Join cluster

SPARK_MASTER_PORT

Standalone master

Standalone worker

(random)

Schedule executors

SPARK_WORKER_PORT

Executor / Standalone master

Driver

(random)

Connect to application / Notify executor state changes

spark.driver.port

Other ports

 

From

To

Default Port

Purpose

Configuration Setting

Browser

Application

4040

Web UI

spark.ui.port

Browser

History server

18080

Web UI

spark.history.ui.port

Driver

Executor

(random)

Schedule tasks

spark.executor.port

Executor

Driver

(random)

File server for files and jars

spark.fileserver.port

Executor

Driver

(random)

HTTP broadcast

spark.broadcast.port

Also, some of the documentation is available with the source distribution in the docs subdirectory, but may be out of date.

Spark components

Since the emergence of Spark, multiple applications that benefit from Spark's ability to cache RDDs have been written: Shark, Spork (Pig on Spark), graph libraries (GraphX, GraphFrames), streaming, MLlib, and so on; some of these will be covered here and in later chapters.

In this section, I will cover major architecture components to collect, store, and analyze the data in Spark. While I will cover a more complete data life cycle architecture in Chapter 2, Data Pipelines and Modeling, here are Spark-specific components:

Spark components

Figure 03-4. Spark architecture and components.

MQTT, ZeroMQ, Flume, and Kafka

All of these are different ways to reliably move data from one place to another without loss and duplication. They usually implement a publish-subscribe model, where multiple writers and readers can write and read from the same queues with different guarantees. Flume stands out as a first distributed log and event management implementation, but it is slowly replaced by Kafka, a fully functional publish-subscribe distributed message queue optionally persistent across a distributed set of nodes developed at LinkedIn. We covered Flume and Kafka briefly in the previous chapter. Flume configuration is file-based and is traditionally used to deliver messages from a Flume source to one or several Flume sinks. One of the popular sources is netcat—listening on raw data over a port. For example, the following configuration describes an agent receiving data and then writing them to HDFS every 30 seconds (default):

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4987

# Describe the sink (the instructions to configure and start HDFS are provided in the Appendix)
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/flume/netcat/data
a1.sinks.k1.hdfs.filePrefix=chapter03.example
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.writeFormat = Text

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

This file is included as part of the code provided with this book in the chapter03/conf directory. Let's download and start Flume agent (check the MD5 sum with one provided at http://flume.apache.org/download.html):

$ wget http://mirrors.ocf.berkeley.edu/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
$ md5sum apache-flume-1.6.0-bin.tar.gz
MD5 (apache-flume-1.6.0-bin.tar.gz) = defd21ad8d2b6f28cc0a16b96f652099
$ tar xf apache-flume-1.6.0-bin.tar.gz
$ cd apache-flume-1.6.0-bin
$ ./bin/flume-ng agent -Dlog.dir=. -Dflume.log.level=DEBUG,console -n a1 -f ../chapter03/conf/flume.conf
Info: Including Hadoop libraries found via (/Users/akozlov/hadoop-2.6.4/bin/hadoop) for HDFS access
Info: Excluding /Users/akozlov/hadoop-2.6.4/share/hadoop/common/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /Users/akozlov/hadoop-2.6.4/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar from classpath
...

Now, in a separate window, you can type a netcat command to send text to the Flume agent:

$ nc localhost 4987
Hello
OK
World
OK

...

The Flume agent will first create a *.tmp file and then rename it to a file without extension (the file extension can be used to filter out files being written to):

$ bin/hdfs dfs -text /flume/netcat/data/chapter03.example.1463052301372
16/05/12 04:27:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1463052302380  Hello
1463052304307  World

Here, each row is a Unix time in milliseconds and data received. In this case, we put the data into HDFS, from where they can be analyzed by a Spark/Scala program, we can exclude the files being written to by the *.tmp filename pattern. However, if you are really interested in up-to-the-last-minute values, Spark, as well as some other platforms, supports streaming, which I will cover in a few sections.

HDFS, Cassandra, S3, and Tachyon

HDFS, Cassandra, S3, and Tachyon are the different ways to get the data into persistent storage and compute nodes as necessary with different guarantees. HDFS is a distributed storage implemented as a part of Hadoop, which serves as the backend for many products in the Hadoop ecosystem. HDFS divides each file into blocks, which are 128 MB in size by default, and stores each block on at least three nodes. Although HDFS is reliable and supports HA, a general complain about HDFS storage is that it is slow, particularly for machine learning purposes. Cassandra is a general-purpose key/value storage that also stores multiple copies of a row and can be configured to support different levels of consistency to optimize read or write speeds. The advantage that Cassandra over HDFS model is that it does not have a central master node; the reads and writes are completed based on the consensus algorithm. This, however, may sometimes reflect on the Cassandra stability. S3 is the Amazon storage: The data is stored off-cluster, which affects the I/O speed. Finally, the recently developed Tachyon claims to utilize node's memory to optimize access to working sets across the nodes.

Additionally, new backends are being constantly developed, for example, Kudu from Cloudera (http://getkudu.io/kudu.pdf) and Ignite File System (IGFS) from GridGain (http://apacheignite.gridgain.org/v1.0/docs/igfs). Both are open source and Apache-licensed.

Mesos, YARN, and Standalone

As we mentioned before, Spark can run under different cluster resource schedulers. These are various implementations to schedule Spark's containers and tasks on the cluster. The schedulers can be viewed as cluster kernels, performing functions similar to the operating system kernel: resource allocation, scheduling, I/O optimization, application services, and UI.

Mesos is one of the original cluster managers and is built using the same principles as the Linux kernel, only at a different level of abstraction. A Mesos slave runs on every machine and provides API's for resource management and scheduling across entire datacenter and cloud environments. Mesos is written in C++.

YARN is a more recent cluster manager developed by Yahoo. Each node in YARN runs a Node Manager, which communicates with the Resource Manager which may run on a separate node. The resource manager schedules the task to satisfy memory and CPU constraints. The Spark driver itself can run either in the cluster, which is called the cluster mode for YARN. Otherwise, in the client mode, only Spark executors run in the cluster and the driver that schedules Spark pipelines runs on the same machine that runs Spark shell or submit program. The Spark executors will talk to the local host over a random open port in this case. YARN is written in Java with the consequences of unpredictable GC pauses, which might make latency's long tail fatter.

Finally, if none of these resource schedulers are available, the standalone deployment mode starts a org.apache.spark.deploy.worker.Worker process on each node that communicates with the Spark Master process run as org.apache.spark.deploy.master.Master. The worker process is completely managed by the master and can run multiple executors and tasks (refer to Figure 3-2).

In practical implementations, it is advised to track the program parallelism and required resources through driver's UI and adjust the parallelism and available memory, increasing the parallelism if necessary. In the following section, we will start looking at how Scala and Scala in Spark address different problems.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset