Working in Spark doesn’t mean limiting yourself to Scala, or even limiting yourself to the JVM, or languages that Spark explicitly supports.
Spark has first-party APIs for writing driver programs and worker code in R,1 Python, Scala, and Java with third-party bindings2 for additional languages including JavaScript, Julia, C#, and F#.
Spark’s language interoperability can be thought of in two tiers:
one is the worker code inside of your transformations (e.g., the lambda’s inside of your maps) and the second is being able to specify the transformations on RDDs/Dataset
s (e.g., the driver program).
This chapter will discuss the performance considerations of using other languages in Spark, and how to effectively work with existing libraries.
Often the language you will choose to specify the code inside of your transformations will be the same as the language for writing the driver program, but when working with specialized libraries or tools (such as CUDA3) specifying our entire program in one language would be a hassle, even if it was possible. Spark supports a range of languages for use on the driver, and an even wider range of languages can be used inside of our transformations on the workers. While the APIs are similar between the languages, the performance characteristics between the different languages are quite different once they need to execute outside of the JVM. We will discuss the design behind language support and how the performance difference can impact your work.
Generally the non-JVM language binding calls the Java interface for Spark using an RPC mechanism, such as Py4J, passing along a serialized representation of the code to be executed on the worker. Regardless of the language used to specify the driver program, the Spark workers will execute in the JVM and if necessary call the language-specific worker program. If the language you’re looking for doesn’t have Spark driver binding available, remember you can write your transformations to call another language on the workers.
On the worker side, the Spark worker is always running in the JVM, and if necessary will start another process for the target and copy the required data and result. This copying is expensive, but Spark’s dependency DAG and clever pipelining minimize the number of times the copying needs to occur. The techniques that the different language APIs use for interfacing their worker code are similar to the same techniques you can use to call your custom code regardless of the language of your driver.
There are many ways to go outside the JVM, ranging from Java Native Interface (JNI), Unix pipes, or interfacing with long-running companion servers over sockets. These are the same techniques used inside of Spark’s internals when interfacing with other languages. For example, JNI is used for calling some linear algebra libraries and Unix pipes are used for interfacing with Python code on the workers. The most efficient solution often depends on whether there are multiple transformations that will need to be evaluated, environment and language setup cost, and the computational complexity of the transformations. Regardless of which specific approach you choose to integrate other languages outside the JVM, these all currently require copying your data from the JVM to the runtime of your target language. Work on both Tungsten and Arrow integration means that in the future it will be easier to work with data from Spark outside of the JVM.
Not all languages require going outside of the JVM, and using these languages with Spark can avoid the expensive copy of the data from the Spark worker to the target language. Some languages take a mixed approach, like the Eclair JS project (see “How Eclair JS Works”), which executes the worker inside of the JVM but leaves the driver program outside of the JVM. While there is, of course, some overhead in having the driver program outside of the JVM, the amount of data that needs to be passed between the Scala driver and target driver is much smaller compared to the amount of data processed by even just one of the workers.
This section will look at how to access the Spark APIs from different languages within the JVM and some of the performance considerations of going outside of Scala. Even if you are going outside of the JVM, it is useful to understand this section since the non-JVM languages often depend on the Java APIs rather than the Scala APIs.
Working in other languages doesn’t always mean having to move beyond the JVM, and staying within the JVM can have many performance benefits—mostly from not having to copy data.
While you don’t necessarily need special bindings or wrappers to access Spark outside of Scala, calling Scala code can be difficult from other languages.
Spark supports Java 8 lambdas for use within transformations, and users with older versions of the JDK can implement the corresponding interface from org.apache.spark.api.java.function
.
Even when data doesn’t need to be copied, working in a different language can have small, yet important, performance considerations.
The difficulty with accessing the Scala APIs is especially true for accessing functions with class tags or using functionality provided through implicit conversions (such as all of the Double
and Tuple
specific functionality on RDDs).
For functionality that depends on implicit conversions, equivalent classes are often provided along with explicit transformations to these concrete classes.
For functions that depend on class tags, “fake” class tags (e.g., AnyRef
) can be supplied (and are automatically supplied often by wrappers).
Using the concrete class instead of the implicit conversion generally doesn’t add any overhead, but the fake class tags can limit some of the compiler optimizations.
The Java API is kept quite close to the Scala API in terms of features, with only the occasional functionality or Developer API not being available. Support for other JVM languages, like Clojure with Flambo and sparkling, is done using the Java APIs instead of calling the Scala APIs directly. Since most of the language bindings, even non-JVM languages like Python and R, go through the Java APIs, it is useful to understand the Java APIs.
The Java APIs closely resemble the Scala APIs, while avoiding depending on class tags or implicit conversions.
The lack of implicit conversions means that rather than automatically converting RDDs containing Tuples
or doubles
to special classes with additional functions, explicit function conversions must be used (such as mapToDouble
and mapToPair
).
These functions are only defined on Java RDDs; thankfully, for interoperability, these special types are simply wrappers of Scala RDDs.
These special functions also return different types, such as JavaDoubleRDD
and JavaPairRDD
, which have the functionality that is provided by the implicit conversions in Scala.
Let’s revisit the canonical word count example using the Java APIs (Example 7-1). Since it can sometimes be convoluted to call the Scala API from Java, Spark’s Java APIs are mostly implemented in Scala while hiding class tags and implicit conversions. This allows the Java wrappers to be a very thin layer, consisting of only a few lines on average, with very little reimplementation required.
import
scala.Tuple2
;
import
org.apache.spark.api.java.JavaRDD
;
import
org.apache.spark.api.java.JavaPairRDD
;
import
org.apache.spark.api.java.JavaSparkContext
;
import
java.util.regex.Pattern
;
import
java.util.Arrays
;
public
final
class
WordCount
{
private
static
final
Pattern
pattern
=
Pattern
.
compile
(
" "
);
public
static
void
main
(
String
[]
args
)
throws
Exception
{
JavaSparkContext
jsc
=
new
JavaSparkContext
();
JavaRDD
<
String
>
lines
=
jsc
.
textFile
(
args
[
0
]);
JavaRDD
<
String
>
words
=
lines
.
flatMap
(
e
->
Arrays
.
asList
(
pattern
.
split
(
e
)).
iterator
());
JavaPairRDD
<
String
,
Integer
>
wordsIntial
=
words
.
mapToPair
(
e
->
new
Tuple2
<
String
,
Integer
>(
e
,
1
));
}
}
Spark supports Java 8 lambdas for most transformations.
If you are working with an earlier version of Java you will need to create instances from org.apache.spark.api.java.function.package. The function names are generally similar to the name of the transformation (e.g., FlatMapFunction
and DoubleFunction
).
Sometimes you may want to convert your Java RDDs to Scala RDDs or vice versa. Most commonly this is for libraries that require or return Scala RDDs, but sometimes core Spark functionality may not yet be available in the Java API and converting your RDD to a Scala RDD is an easy way to access the new functionality.
If you have a Java RDD you want to pass to a Scala library expecting a regular Spark RDD, you can access the underlying Scala RDD with rdd()
.
Most often this is sufficient to pass the resulting RDD to whichever Scala library you need to call; some notable exceptions are Scala libraries that depend on implicit conversions of the contents of the RDD or class tag information.
In this case writing a small wrapper in Scala can be the simplest way to access the implicit conversions.
If a Scala shim is out of the question, explicitly calling the corresponding function on the JavaConverters object, construct a fake class tag.
To construct a fake class tag you can use scala.reflect.ClassTag$.MODULE$.AnyRef()
or get the actual class tag with scala.reflect.ClassTag$.MODULE$.apply(CLASS)
as illustrated in Examples 7-2 and 7-3.
Going from a Scala RDD to a Java RDD often requires class tag information more than most Spark libraries.
This is because, while the different JavaRDDs
expose public constructors that take Scala RDDs as arguments, these are intended to be called from within Scala and therefore expect class tag information.
If you are in a mixed language project or library, consider constructing the Java RDD in the Scala where the class tag information is more easily available.
Fake class tags are most commonly used in generic or templated code in which you don’t know the exact types at compile time. Using fake class tags often work, although some specialization may be lost in the Scala side; very occasionally the Scala code depends on correct class tag information. In this case you must use a real class tag. In most cases, using a real class tag is not substantially more effort and can offer performance advantages, so use them when possible.
public
static
JavaPairRDD
wrapPairRDDFakeCt
(
RDD
<
Tuple2
<
String
,
Object
>>
rdd
)
{
// Construct the class tags by casting AnyRef - this would be more commonly done
// with generic or templated code where we can't explicitly construct the correct
// class tag as using fake class tags may result in degraded performance.
ClassTag
<
Object
>
fake
=
ClassTag$
.
MODULE
$
.
AnyRef
();
return
new
JavaPairRDD
(
rdd
,
fake
,
fake
);
}
public
static
JavaPairRDD
wrapPairRDD
(
RDD
<
Tuple2
<
String
,
Object
>>
rdd
)
{
// Construct the class tags
ClassTag
<
String
>
strCt
=
ClassTag$
.
MODULE
$
.
apply
(
String
.
class
);
ClassTag
<
Long
>
longCt
=
ClassTag$
.
MODULE
$
.
apply
(
scala
.
Long
.
class
);
return
new
JavaPairRDD
(
rdd
,
strCt
,
longCt
);
}
Both the Spark SQL and the ML pipeline APIs are mostly unified between Scala and Java.
There are still Java-specific helper functions in which the equivalent Scala function is difficult to call.
Some examples of this are the various numeric functions, like plus
minus
, etc., on Column
as the overloaded Scala
equivalents (+
, -
) cannot be easily accessed.
Rather than having JavaDataFrame
and a JavaSQLContext
, the methods required for Java access are available on the regular DataFrame
and SQLContext
.
This can be somewhat confusing, as some of the methods that will appear in the JavaDoc may not be usable from Java, but in those cases similarly named functions will be provided to be called from Java.
Java UDFs, and by extension most other non-Scala languages, require specifying the return type of your function as it can’t be inferred in the same way it is done in Scala (Example 7-4).
sqlContext
.
udf
()
.
register
(
"strlen"
,
(
String
s
)
->
s
.
length
(),
DataTypes
.
StringType
);
While the types required by the Scala and Java APIs are different, for the most part, the Java collection types can be wrapped without requiring an extra copy. For iterators, the wrap conversion can be done lazily as the elements are accessed, allowing Spark to spill the data as needed (as discussed in “Iterator-to-Iterator Transformations with mapPartitions”). This is especially important since for many simple operations the cost of copying the data can quickly dominate the actual computation required.
Going beyond the JVM greatly opens up the scope of different languages available for you to work in. However, in its current architecture, going outside of the JVM in Spark—especially on the workers—can involve a substantial performance cost of copying data on worker nodes between the JVM and the target language. For complex operations the cost of copying the data is relatively low, but for simpler operations the cost of copying the data can easily double the computation cost.
The first non-JVM language to be directly supported inside of Spark is Python, and its API and interface have become a model that other non-JVM languages have based their implementations on.
PySpark connects to JVM Spark using a mixture of pipes on the workers and Py4J, a specialized library for Python/Java interoperability, on the driver. This relatively simple architecture hides a large number of complexities involved in making PySpark work, as Figure 7-1 shows. One of the bigger challenges is that even once the data has been copied from the Python worker to the JVM, it isn’t in a form the JVM can easily parse. This requires special handling on both the Python worker and Java to ensure sufficient information for things like partitioning is available in the JVM.
After the initial reading from persistent storage (like HDFs or S3) and between any shuffle, the data on the workers needs to be passed between the JVM and Python.
Is IPython your jam? In Spark 2.0+ the old syntax to get an IPython notebook has changed from IPYTHON_OPTS="notebook"
to PYSPARK_DRIVER_PYTHON="ipython" PYSPARK_DRIVER_PYTHON_OPTS="notebook"
.
Transferring the data to and from the JVM and starting the Python
executor has significant overhead.
Using the DataFrame
/Dataset
API avoids many of the performance challenges with the PySpark RDD API by keeping the data inside the JVM for as long as possible.
Copying the data from the JVM to Python is done using sockets and pickled bytes.
A more general version of this, for talking to programs in other languages, is available through the PipedRDD
interface illustrated in “Using Pipe and Friends”.
Since piping the data back and forth for each transformation would be expensive, PySpark pipelines Python transformations inside of the Python interpreter when possible, so a filter
then a map
will be chained together on the iterator of Python objects using a specialized PipelinedRDD
.
Even when the data has to be shuffled and PySpark is unable to chain our transformations inside of a single worker VM, the Python interpreter is capable of being reused so the interpreter startup overhead doesn’t further slow us down.
This is only part of the puzzle. Normal PipedRDDs
work on Strings
, which can’t easily be shuffled since there is no inherent key.
The approach taken in PySpark, and mirrored in many other language bindings, is a special PairwiseRDD
in which the key must be a long and the key only is deserialized with custom Scala code to parse the Python value.
This deserialization is not overly expensive, but does serve to illustrate that for the most part, Spark Scala treats the results of Python as opaque bytes arrays.
Since there is some overhead associated with serialization and deserialization, PySpark uses a batch serializer, and this can occasionally result in unexpected effects (like when repartitioning PySpark will not split up things in the same batch).
For all its simplicity this approach to integrating works surprisingly well, with the majority of operations on Scala RDDs available in Python. Some of the more difficult places are interacting with libraries, such as MLlib, and loading and saving from different sources.
Interacting with different formats is another restriction, as much of Spark’s load/save code is based on Hadoop’s Java interfaces. This means that any data loaded is initially loaded into the JVM and then transferred to Python.
For interacting with MLlib, generally two approaches have been taken: either a specialized data type is used in PySpark with equivalent Scala decoders, or the algorithm is reimplemented in Python.
These problems are avoided with Spark ML, which uses the DataFrame
/Dataset
interface that generally keeps the data stored in the JVM.
DataFrame
s and Dataset
s avoid many of the performance downsides of the Python RDD API by keeping the data inside the JVM for as long as possible.
The same benchmark we did to illustrate DataFrame
s’ general improvement over RDDs (Figure 3-1) shows a greater difference when rerun in Python (Figure 7-2).
For many operations on DataFrame
s and Dataset
s, the data may never actually need to leave the JVM, although using Python UDFs, UDAFs, or lambdas naturally requires transferring some of the data to the JVM. This results in a simplified architecture diagram for many operations, which instead of Figure 7-1, looks like Figure 7-3.
PySpark doesn’t use Jython because it has been found that a lot of Python users need access to libraries, like numpy, scipy, and pandas, which do not work well in Jython.
Some early work is being investigated to see if Jython can be used to accelerate Python UDFs, which don’t depend on C extensions. See SPARK-15369 for updates.
An important implication of the PySpark architecture is that many of Spark’s Python classes simply exist as wrappers to translate your Python calls to the JVM.
If you work with Scala/Java developers and you wish to collaborate, preexisting wrappers won’t exist to call your own code—but you can register Java/Scala UDFs and then use them from Python. Starting in Spark 2.1 this can be done with the registerJavaFunction
utility on the sqlContext
.
Sometimes these wrappers don’t do everything you need, and since Python doesn’t have strong protections around accessing private methods, you can jump directly into the JVM. The same techniques can be used to call your own JVM code, and with a bit of work translate the results into Python objects.
While the Py4J API is accessible, these techniques depend on implementation details of PySpark, and these implementation details may change between releases.
Thinking back to “Large Query Plans and Iterative Algorithms”, we suggested that it was important to use the JVM version of DataFrame
s and RDDs to cut the query plan.
This is a workaround for when a query plan becomes too large for the Spark SQL optimizer to process, by putting an RDD in the middle the SQL optimizer can’t see back past the point where the data is in an RDD.
While you could accomplish the same thing using public Python APIs, you would lose much of the advantage of DataFrame
s as the entire data would need to be round-tripped through the Python workers.
Instead, by using some of the internal APIs, you can cut the lineage from Python while keeping the data in the JVM (as shown in Example 7-5).
def
cutLineage
(
df
):
"""
Cut the lineage of a DataFrame - used for iterative algorithms
.. Note: This uses internal members and may break between versions
>>> df = rdd.toDF()
>>> cutDf = cutLineage(df)
>>> cutDf.count()
3
"""
jRDD
=
df
.
_jdf
.
toJavaRDD
()
jSchema
=
df
.
_jdf
.
schema
()
jRDD
.
cache
()
sqlCtx
=
df
.
sql_ctx
try
:
javaSqlCtx
=
sqlCtx
.
_jsqlContext
except
:
javaSqlCtx
=
sqlCtx
.
_ssql_ctx
newJavaDF
=
javaSqlCtx
.
createDataFrame
(
jRDD
,
jSchema
)
newDF
=
DataFrame
(
newJavaDF
,
sqlCtx
)
return
newDF
In general, the convention for most python objects is _j[shortname]
to access the underlying Java version.
So, for example, the SparkContext
has _jsc
to get at the underling Java SparkContext
.
This is only available on the driver program, so if any PySpark objects are sent to the workers you won’t be able to access the underlying Java component and large parts of the API will not work.
The Python APIs generally wrap Java versions of the API rather than directly wrapping the Scala versions.
If you want to access a JVM Spark class that does not already have a Python wrapper, you can directly use the Py4J gateway on the driver.
The SparkContext contains a reference to the gateway in _gateway
.
Arbitrary Java objects can be accessed with sc._gateway.jvm.[fulljvmclassname]
.
Py4J depends heavily on reflection to determine which methods to call. This is normally not a problem, but can become confusing with numeric types. Attempting to call a Scala function expecting a Long
with an Integer
will result in an error message about not being able to find the method, even though in Python the distinction normally would not matter.
The same technique works for your own Scala classes provided they are on the class path.
You can add JARs to the class path with spark-submit
with --jars
or by setting the spark.driver.extraClassPath
configuration property.
Example 7-6, which we used to generate Figure 7-2, is intentionally structured to use the existing Scala code to generate the performance testing data.
sc
=
sqlCtx
.
_sc
# Get the SQL Context, 2.1, 2.0 and pre-2.0 syntax - yay internals :p
try
:
try
:
javaSqlCtx
=
sqlCtx
.
_jsqlContext
except
:
javaSqlCtx
=
sqlCtx
.
_ssql_ctx
except
:
javaSqlCtx
=
sqlCtx
.
_jwrapped
jsc
=
sc
.
_jsc
scalasc
=
jsc
.
sc
()
gateway
=
sc
.
_gateway
# Call a java method that gives us back an RDD of JVM Rows (Int, Double)
# While Python RDDs are wrapped Java RDDs (even of Rows) the contents are
# different, so we can't directly wrap this.
# This returns a Java RDD of Rows - normally it would better to
# return a DataFrame directly, but for illustration we will work
# with an RDD of Rows.
java_rdd
=
(
gateway
.
jvm
.
com
.
highperformancespark
.
examples
.
tools
.
GenerateScalingData
.
generateMiniScaleRows
(
scalasc
,
rows
,
numCols
))
# Schemas are serialized to JSON and sent back and forth
# Construct a Python Schema and turn it into a Java Schema
schema
=
StructType
([
StructField
(
"zip"
,
IntegerType
()),
StructField
(
"fuzzyness"
,
DoubleType
())])
# 2.1 / pre-2.1
try
:
jschema
=
javaSqlCtx
.
parseDataType
(
schema
.
json
())
except
:
jschema
=
sqlCtx
.
_jsparkSession
.
parseDataType
(
schema
.
json
())
# Convert the Java RDD to Java DataFrame
java_dataframe
=
javaSqlCtx
.
createDataFrame
(
java_rdd
,
jschema
)
# Wrap the Java DataFrame into a Python DataFrame
python_dataframe
=
DataFrame
(
java_dataframe
,
sqlCtx
)
# Convert the Python DataFrame into an RDD
pairRDD
=
python_dataframe
.
rdd
.
map
(
lambda
row
:
(
row
[
0
],
row
[
1
]))
return
(
python_dataframe
,
pairRDD
)
Attempting to use the Py4J bridge inside of your transformations will fail at runtime.
While many of the Python classes are simply wrappers of Java objects, not all Java objects can directly be wrapped into Python objects and then used in Spark.
For example, objects in PySpark RDDs are represented as pickled strings, which can only be easily parsed in Python.
Thankfully, DataFrame
s are standardized between the languages, so provided you can convert your data into a DataFrame
, you can then wrap it in Python and use it directly as a Python DataFrame
or convert the Python DataFrame
to a Python RDD.
Scala UDFs and UDAFs can be used from Python without having to go through the Py4J API.
Often a large part of the reason one wants to use a language other than Scala is for the libraries that are available with that language. In addition to language-specific libraries, you may need to include libraries for Spark itself to use, especially when working with different data formats. There are a few different options for using both Spark-specific and language-specific libraries in PySpark.
Spark Packages is a system that allows us to easily include JVM dependencies with Spark. A common reason for wanting additional JVM libraries in PySpark is support for additional data formats.
If you are working in the Scala shell you can use the --packages
command-line argument to specify the Maven coordinates of a package you want in the shell.
If you are building a Scala package you also add any requirements to your assembly .jar.
For Python, you can create a Java or Scala project with your JVM dependencies and add the .jar with --jar
.
If you’re working in the PySpark shell command-line arguments aren’t allowed, so you can instead specify the spark.jars.packages
configuration variable.
When using Spark Packages the dependencies are automatically fetched from Maven and distributed to the cluster. If your JVM dependency is not available in Maven, you can use the same technique we discuss next for adding local Python dependencies.
Adding local dependencies with PySpark can be done at both job submission time and dynamically using the SparkContext
.
Local dependencies can be .jar files, for JVM requirements, or .zip and .egg for Python dependencies, which are automatically added to the PYTHONPATH
.
There is currently work under way to allow Python Spark programs to specify required pip packages and have them auto installed, but the proposal has not yet been accepted. See the pull request and SPARK-5929 for the status of this proposal.
For individuals working with a CDH cluster, it is now possible to easily add packages with Anaconda. Cloudera’s post Making Python on Apache Hadoop Easier details how to install the packages on your cluster. To make the resulting packages accessible to Apache Spark, all you need to do is set the shell environment variable PYSPARK_PYTHON
to /opt/cloudera/parcels/Anaconda/bin/python
either with export in your shell profile or in your spark-env.sh file.
If none of the above work for your cluster configuration there are a few remaining options, all of which are somewhat less than ideal. The simplest, but very hacky, approach is to simply have your transformations explicitly import the package and on failure, perform a pip installation. Similar approaches can be done with broadcast variables or a setup map at the start of the program. Failing that you can ask your cluster administrator to install the package systemwide with parallel-ssh or similar, as shown in Example 7-7.
parallel-ssh pip install -h ./conf/slaves
First-party languages for Spark don’t require any separate installation, but as mentioned for Python packages, Python has its own mechanisms for dealing with package management.
Installation with pip was added in PySpark version 2.1, and at that point you can download the PySpark package from the Apache download mirror and run pip install pyspark-2.1.0.tar.gz
, allowing virtualenv support as well.
PySpark 2.2.0 (and forward) are directly published PyPi
allowing for an even simpler pip install pyspark
.
Once you have PySpark pip installed you can then start your favorite Python interpreter and import pyspark
like any other package or start the PySpark shell with pyspark
.
It’s important to note that pip installing Spark is optional. If you wish you can run PySpark from a regular Spark setup without pip installation (although then you must use spark-submit
or pyspark
from the Spark bin
directory).
SparkR takes a similar approach to PySpark, but does not currently expose the ability to perform arbitrary R code in the workers.
While a similar PipedRDD
wrapper exists for R as it does for Python, it is kept internal and the only public interface for working with R is through DataFrame
s.
Of the directly supported languages, SparkR is the furthest away from Scala Spark in terms of feature completeness. This gap will likely close over time, but be careful when selecting SparkR to ensure it has the features you need. The API documentation will give you an idea if what you are looking for is already available.
To give you an idea of what the SparkR interface looks like, the standard word count example has been rewritten in R in Example 7-8.
library
(
SparkR
)
# Setup SparkContext & SQLContext
sc
<-
sparkR.init
(
appName
=
"high-performance-spark-wordcount-example"
)
# Initialize SQLContext
sqlContext
<-
sparkRSQL.init
(
sc
)
# Load some simple data
df
<-
read.text
(
fileName
)
# Split the words
words
<-
selectExpr
(
df
,
"split(value, " ") as words"
)
# Compute the count
explodedWords
<-
select
(
words
,
alias
(
explode
(
words
$
words
),
"words"
))
wc
<-
agg
(
groupBy
(
explodedWords
,
"words"
),
"words"
=
"count"
)
# Attempting to push an array back fails
# resultingSchema <- structType(structField("words", "array<string>"))
# words <- dapply(df, function(line) {
# y <- list()
# y[[1]] <- strsplit(line[[1]], " ")
# }, resultingSchema)
# Also attempting even the identity transformation on a DF from read.text fails
# in Spark 2.0-preview (although works fine on other DFs).
# Display the result
showDF
(
wc
)
To execute your own custom R code you can use the dapply
method on DataFrame
s as illustrated in Example 7-9.
SparkR’s custom code execution support has a long way to go, as illustrated by the difficulty of attempting to perform a word count with dapply
in Example 7-8.
library
(
SparkR
)
# Setup SparkContext & SQLContext
sc
<-
sparkR.init
(
appName
=
"high-performance-spark-wordcount-example"
)
# Initialize SQLContext
sqlContext
<-
sparkRSQL.init
(
sc
)
# Count the number of characters - note this fails on the text DF due to a bug.
df
<-
createDataFrame
(
sqlContext
,
list
(
list
(
1L
,
1
,
"1"
),
list
(
2L
,
2
,
"22"
),
list
(
3L
,
3
,
"333"
)),
c
(
"a"
,
"b"
,
"c"
))
resultingSchema
<-
structType
(
structField
(
"length"
,
"integer"
))
result
<-
dapply
(
df
,
function
(
row
)
{
y
<-
list
()
y
<-
cbind
(
y
,
nchar
(
row
[[
3
]]))
},
resultingSchema
)
showDF
(
result
)
Internally dapply
is implemented in a similar way to Python’s UDF support, but since the RDD API isn’t exposed it leaves more potential for future optimizations and encourages development with the more optimized DataFrame
APIs.
As with PySpark, arbitrary non-JVM code execution is slower than traditional Scala Spark code.
SparkR isn’t the only interface for running Spark and R together. Sparklyr is a 3rd party library, from R Studio, which is also quite popular. From a performance point of view, it shares the same underlying mechanisms as SparkR in interfacing with the JVM.
Spark.jl is one of the newer projects to provide bindings for Spark and as such does not yet have a fully functional subset of the API supported.
Spark.jl is incredibly easy to install (see Example 7-10), and it automatically installs a supported version of Spark along side it.
The general design of Spark.jl is similar to that of PySpark, with a custom implementation of the PipedRDD
that is able to parse limited amounts of serialized data from Julia implemented inside of the JVM.
The same general performance caveats of using PySpark also apply to Spark.jl.
Pkg
.
clone
(
"https://github.com/dfdx/Spark.jl"
)
Pkg
.
build
(
"Spark"
)
# we also need latest master of JavaCall.jl
Pkg
.
checkout
(
"JavaCall"
)
As of this writing, named Julia functions cannot be fully serialized, so functions used inside of transformations should be anonymous.
Until keyed operations are supported in Spark.jl we can’t even build the simple word count example.
Namely, reduceByKey
is missing, which is required for shuffling, and while others like flatMap
are missing it can be replaced with mapPartitions
.
For now Spark.jl is an early stage project that shows promise but is not ready for use.
Eclair JS takes a different approach than R and Python support, mostly staying inside the JVM except for the driver program. Eclair JS runs JavaScript in both the JVM and V8 JavaScript engine, with the functions inside of the transformations being evaluated by the JVM using Nashorn. The split between driver-side and worker-side evaluation allows for fast integration on the workers and NodeJS bindings on the driver. See Figure 7-4 for a diagram of this.
This somewhat unorthodox approach means that certain library functions may not be available inside of the transformations, but saves us from the double serialization problem found in PySpark and SparkR UDFs. The node driver communicates using Apache Toree to send the required functions to the JVM, which then sends them to the workers.
Installing Eclair JS is easy relative to other languages as the worker side is able to run without any extra packages. The getting started guide walks you through the setup process.
While Eclair JS presents some interesting novel ideas, it has been deprecated.
Microsoft’s Mobius project provides C# bindings for working with Apache Spark.
The general design is similar to that of PySpark, with the internals of PythonRDD
instead communicating with the CLR.
As with PySpark, RDD transformations involve copying the data from the JVM, and DataFrame
transformations that don’t use UDFs in C# don’t require copying the data on the workers (or even launching the CLR).
If you are curious about using Mobius you can check out the design documents and examples.
In addition to using other languages to call Spark, we can call other languages from Spark.
If there aren’t existing wrappers for the language you are working with, one of the simplest options is using Spark’s pipe
interface.
To use the pipe
interface you start by converting your RDDs into a format in which they can be sent over a Unix pipe.
Often simple formats like JSON or CSV are used for communicating, as lightweight libraries exist for generating and parsing these records in many languages.
Let’s return to the Goldilocks example from “The Goldilocks Example”.
Suppose that in addition to optimal panda porridge temperature, you also wanted to find out which pandas had been commenting on Spark PRs;4 you might cook up a quick little Perl script, as in Example 7-11.
Later on, if you want to use this script in Spark you can use the pipe
command to call your Perl script from the workers.
Since pipe
only works with strings, you will need to format your inputs as a string and parse the result string back into the correct data type, as in Example 7-12.
#!/usr/bin/perl
use
strict
;
use
warnings
;
use
Pithub
;
use
Data::
Dumper
;
# Find all of the commentors on an issue
my
$user
=
$ENV
{
'user'
};
my
$repo
=
$ENV
{
'repo'
};
my
$p
=
Pithub
->
new
(
user
=>
$user
,
repo
=>
$repo
);
while
(
my
$id
=
<>
)
{
chomp
(
$id
);
my
$issue_comments
=
$p
->
issues
->
comments
->
list
(
issue_id
=>
$id
);
$id
;
while
(
my
$comment
=
$issue_comments
->
next
)
{
" "
.
$comment
->
{
"user"
}
->
{
"login"
};
}
" "
;
}
def
lookupUserPRS
(
sc
:
SparkContext
,
input
:
RDD
[
Int
])
:
RDD
[(
Int
,List
[
String
])]
=
{
// Copy our script to the worker nodes with sc.addFile
// Add file requires absolute paths
val
distScriptName
=
"ghinfo.pl"
val
userDir
=
System
.
getProperty
(
"user.dir"
)
val
localScript
=
s"
${
userDir
}
/src/main/perl/
${
distScriptName
}
"
val
addedFile
=
sc
.
addFile
(
localScript
)
// Pass enviroment variables to our worker
val
enviromentVars
=
Map
(
"user"
->
"apache"
,
"repo"
->
"spark"
)
val
result
=
input
.
map
(
x
=>
x
.
toString
)
.
pipe
(
SparkFiles
.
get
(
distScriptName
),
enviromentVars
)
// Parse the results
result
.
map
{
record
=>
val
elems
:
Array
[
String
]
=
record
.
split
(
" "
)
(
elems
(
0
).
toInt
,
elems
.
slice
(
1
,
elems
.
size
).
sorted
.
distinct
.
toList
)
}
}
Spark will not automatically copy your script to the worker machines, so if you are calling a custom program you can use the sc.addFile
interface as in Example 7-12. Otherwise (e.g., if you are calling a systemwide program), just skip that part.
PySpark and SparkR both use specialized version of the Piped RDDs for communication on the workers.
Make sure that you handle empty partitions, since your program will be called even for empty partitions (although this functionality may change in future versions).
The Java Native Interface (JNI) is another option for interfacing with other languages.
JNI can work well for calling certain C/C++ libraries, as well as other statically compiled languages like FORTRAN.
While JNI doesn’t exactly suffer from double serialization in the same way calling PySpark or using pipe
does, you still need to copy your data out of the JVM and back.
This is why some libraries, such as JBLAS, implement some components inside of the JVM, since once copy cost is added, the performance benefit of native code can go away.
To illustrate how to use JNI with Spark, consider calling a very simple C
function that sums all of the nonzero inputs. Its function signature is shown in Example 7-13.
#ifndef _SUM_H
#define _SUM_H
int
sum
(
int
input
[],
int
num_elem
);
#endif
/* _SUM_H */
You can write the JNI specification to call this in either Java (Example 7-14) or Scala (Example 7-15). Although the tooling for Java can be a bit simpler, there is no significant difference between them.
class
SumJNIJava
{
public
static
native
Integer
sum
(
Integer
[]
array
);
}
class
SumJNI
{
@native
def
sum
(
n
:
Array
[
Int
])
:
Int
}
Manually writing wrappers takes effort. Check out SWIG to automatically generate parts of your bindings.
Once you have your C function and your JNI class specification, you need to generate your class files and from them generate the binder heading (see Example 7-16). The javah
command will take the class files and generate headers that is then used to create a C-side wrapper.
javah -classpath ./target/examples-0.0.1.jar com.highperformancespark.examples.ffi.SumJNI
For those of you building with SBT, Jakob Odersky’s sbt-jni
package makes it easy to integrate your native code with your Scala project.
sbt-jni
is published as an SBT plug-in like spark-packages-sbt
, and is included by adding an entry to project/plugins.sbt as shown in Example 7-17.
addSbtPlugin
(
"ch.jodersky"
%%
"sbt-jni"
%
"1.0.0-RC3"
)
sbt-jni
simplifies generating the header file by adding the javah
target to sbt, which will generate the header files and place them in ./target/native/include/.
Once we have our header file we need to write a wrapper in C. The generated header file shouldn’t be modified, but rather imported into our shim as shown in Example 7-18.
#include "sum.h"
#include "include/com_highperformancespark_examples_ffi_SumJNI.h"
#include <ctype.h>
#include <jni.h>
/*
* Class: com_highperformancespark_examples_ffi_SumJNI
* Method: sum
* Signature: ([I)I
*/
JNIEXPORT
jint
JNICALL
Java_com_highperformancespark_examples_ffi_SumJNI_sum
(
JNIEnv
*
env
,
jobject
obj
,
jintArray
ja
)
{
jsize
size
=
(
*
env
)
->
GetArrayLength
(
env
,
ja
);
jint
*
a
=
(
*
env
)
->
GetIntArrayElements
(
env
,
ja
,
0
);
return
sum
(
a
,
size
);
}
sbt-jni
also simplifies building and packaging native code, adding nativeCompile
, javah
, and packageBin
to allow you to easily build an assembly JAR with both your native files and Java artifacts.
For sbit-jni
to build your native code (in addition to the JVM code) as well, you need to provide a Makefile.
If you are starting with a new project, nativeInit CMake
target will generate a skeleton CMakeLists.txt file you can use as a basis for your native build.
In our example project, we’ve built the native code along with the Scala code. Alternatively, especially if you plan to support multiple architectures, you may wish to create a separate package for your native code.
If your artifact is built with sbt-jni
you can use the nativeLoader
decorator from ch.jodersky.jni.nativeLoader
to automatically load your native code as needed.
In the example we’ve been working on, our library is called libhigh-performance-spark0
so we can have it automatically loaded by adding the decorator to our SumJNI class, as in Example 7-19.
@nativeLoader
(
"high-performance-spark0"
)
If you are working in Java, or just want more control, you can use System.loadLibrary
, which takes a library name and searches java.library.path
or System.load
with an absolute path.
Leave off the “lib” prefix, which loadLibrary
(and sbt-jni
) automatically append, or you will get confusing runtime linking errors.
The Oracle JNI specification can be a useful reference.
If your native library likely isn’t packaged in your JAR, you need to make sure the JVM running the Spark worker is able to call it.
If your library is already installed on the workers you can add -Djava.library.path=...
to your spark.executor.extraJavaOptions
.
Java Native Access (JNA) is a community-driven alternative to JNI to allow calling of native code, ideally without all of the boilerplate required by JNI. Although JNA is a community package this does not mean it is low quality; it is used by a variety of mature projects and has been used by Spark application developers. We can use JNA to call our previous example in both Scala (Example 7-20) and Java.
import
com.sun.jna._
object
SumJNA
{
Native
.
register
(
"high-performance-spark0"
)
@native
def
sum
(
n
:
Array
[
Int
],
size
:
Int
)
:
Int
}
It’s important to note that these JNA examples skip the requirement for writing the JNI wrapper (as in Example 7-18) and instead directly call the C
function for us. While SWIG can do a good job of generating much of the JNI wrappers, for some this is a compelling reason to use JNA over JNI.
When using JNA, jna.boot.library.path
allows you to add libraries to the search path before the system library path.
A surprising number of numeric computing libraries still have FORTRAN implementations. Thankfully many of these libraries already have Java or Python wrappers, which greatly simplify our access. These libraries often can make intelligent decisions about what operations are worth the overhead of copying our data into FORTRAN and what operations make more sense to be implemented in the host language. Not all FORTRAN code already has wrappers, and you may find yourself in a place with which you want to interface.
The general process is to first create a C/C++ wrapper that exposes the FORTRAN code for Java to call, and then link the C/C++ code together with the FORTRAN code. Continuing the sum example in FORTRAN (Example 7-21), you would create a C wrapper like Example 7-22, and then follow the existing steps for calling a C library in “JNI”.
INTEGER
FUNCTION
SUMF
(
N
,
A
)
BIND
(
C
,
NAME
=
'sumf'
)
INTEGER
A
(
N
)
SUMF
=
SUM
(
A
)
END
// Fortran routine
extern
int
sumf
(
int
*
,
int
[]);
// Call the fortran code which expects by reference size
int
wrap_sum
(
int
input
[],
int
size
)
{
return
sumf
(
&
size
,
input
);
}
If you like sbt-jni
you can extend the generated CMake
file to also compile your FORTRAN code.
These wrappers can also be automatically generated with programs like fortrwrap, or skipped entirely with JNA. Calling the FORTRAN function with JNA is very similar to calling the C function, as shown in Example 7-23.
import
com.sun.jna._
import
com.sun.jna.ptr._
object
SumFJNA
{
Native
.
register
(
"high-performance-spark0"
)
@native
def
sumf
(
n
:
IntByReference
,
a
:
Array
[
Int
])
:
Int
def
easySum
(
size
:
Int
,
a
:
Array
[
Int
])
:
Int
=
{
val
ns
=
new
IntByReference
(
size
)
sumf
(
ns
,
a
)
}
}
Calling FORTRAN code from the JVM is more difficult than calling C code. If available, it’s often better to use existing wrappers as they can make intelligent decisions about which components to execute in FORTRAN rather than in the JVM.
GPUs are another great way of working with parallel, numeric computing problems. They have been shown to be particularly effective at certain types of machine learning problems. Some single-node distributed systems exist just to coordinate the work of multiple GPUs. If your problem is well suited to GPU acceleration, the performance improvement can be huge (SparkGPULR showed a 3× improvement).
The GPUEnabler Spark package exists to simplify interfacing Spark with CUDA. The package simplifies the setup of JCUDA and automates converting your data into a columnar format for working on GPUs.
Some people have also used aparapi to automate compilation of Java code to OpenCL, although no packages exist to simplify the integration currently.
At present there is no unified way inside of Apache Spark to perform GPU acceleration, with competing proposals from IBM (spark-gpu), Adobe (spark-gpu), and others.
For those interested you may wish to follow SPARK-12620 and friends.
Tungsten has the ability to store data off-heap with Spark, but the data format is currently not stable or sufficiently documented to enable shared access from other languages. Two possibilities exist to improve this: either the standardization of Tungsten, SPARK-9697, or the integration of Arrow in Python and Spark, SPARK-13534. Hopefully future editions of this book will be able to report the awesomeness that these changes have enabled.
Writing high-performance Spark code need not be limited to Scala, let alone the JVM (although it can certainly make things easier). Spark has a wide variety of language bindings, both built-in and third party, and can interface with even more languages using JNI, JNA, pipes, or sockets. For some operations, the cost of copying the data outside of the JVM and back can be more expensive than just doing the operation in the JVM—even with specialized libraries—so it is important to consider the complexity of your transformations before going outside of the JVM. While not currently supported, Tungsten’s off-heap support may eventually standardize in such a way as to better support language interoperability on the workers.
1 There are multiple competing R APIs, but for the purposes of performance they share the same underlying design.
2 Just because support is first party does not mean it will be fast; in some cases third-party bindings have taken interesting work to minimize overhead that has not been implemented in the first-party languages.
3 CUDA is a specialized language for parallel GPU programming from NVIDIA.
4 This is somewhat of a stretch as far as the relationship to Goldilocks goes, but you know.