Chapter 7. Going Beyond Scala

Working in Spark doesn’t mean limiting yourself to Scala, or even limiting yourself to the JVM, or languages that Spark explicitly supports. Spark has first-party APIs for writing driver programs and worker code in R,1 Python, Scala, and Java with third-party bindings2 for additional languages including JavaScript, Julia, C#, and F#. Spark’s language interoperability can be thought of in two tiers: one is the worker code inside of your transformations (e.g., the lambda’s inside of your maps) and the second is being able to specify the transformations on RDDs/Datasets (e.g., the driver program). This chapter will discuss the performance considerations of using other languages in Spark, and how to effectively work with existing libraries.

Often the language you will choose to specify the code inside of your transformations will be the same as the language for writing the driver program, but when working with specialized libraries or tools (such as CUDA3) specifying our entire program in one language would be a hassle, even if it was possible. Spark supports a range of languages for use on the driver, and an even wider range of languages can be used inside of our transformations on the workers. While the APIs are similar between the languages, the performance characteristics between the different languages are quite different once they need to execute outside of the JVM. We will discuss the design behind language support and how the performance difference can impact your work.

Generally the non-JVM language binding calls the Java interface for Spark using an RPC mechanism, such as Py4J, passing along a serialized representation of the code to be executed on the worker. Regardless of the language used to specify the driver program, the Spark workers will execute in the JVM and if necessary call the language-specific worker program. If the language you’re looking for doesn’t have Spark driver binding available, remember you can write your transformations to call another language on the workers.

On the worker side, the Spark worker is always running in the JVM, and if necessary will start another process for the target and copy the required data and result. This copying is expensive, but Spark’s dependency DAG and clever pipelining minimize the number of times the copying needs to occur. The techniques that the different language APIs use for interfacing their worker code are similar to the same techniques you can use to call your custom code regardless of the language of your driver.

There are many ways to go outside the JVM, ranging from Java Native Interface (JNI), Unix pipes, or interfacing with long-running companion servers over sockets. These are the same techniques used inside of Spark’s internals when interfacing with other languages. For example, JNI is used for calling some linear algebra libraries and Unix pipes are used for interfacing with Python code on the workers. The most efficient solution often depends on whether there are multiple transformations that will need to be evaluated, environment and language setup cost, and the computational complexity of the transformations. Regardless of which specific approach you choose to integrate other languages outside the JVM, these all currently require copying your data from the JVM to the runtime of your target language. Work on both Tungsten and Arrow integration means that in the future it will be easier to work with data from Spark outside of the JVM.

Not all languages require going outside of the JVM, and using these languages with Spark can avoid the expensive copy of the data from the Spark worker to the target language. Some languages take a mixed approach, like the Eclair JS project (see “How Eclair JS Works”), which executes the worker inside of the JVM but leaves the driver program outside of the JVM. While there is, of course, some overhead in having the driver program outside of the JVM, the amount of data that needs to be passed between the Scala driver and target driver is much smaller compared to the amount of data processed by even just one of the workers.

Beyond Scala within the JVM

This section will look at how to access the Spark APIs from different languages within the JVM and some of the performance considerations of going outside of Scala. Even if you are going outside of the JVM, it is useful to understand this section since the non-JVM languages often depend on the Java APIs rather than the Scala APIs.

Working in other languages doesn’t always mean having to move beyond the JVM, and staying within the JVM can have many performance benefits—mostly from not having to copy data. While you don’t necessarily need special bindings or wrappers to access Spark outside of Scala, calling Scala code can be difficult from other languages. Spark supports Java 8 lambdas for use within transformations, and users with older versions of the JDK can implement the corresponding interface from org.apache.spark.api.java.function. Even when data doesn’t need to be copied, working in a different language can have small, yet important, performance considerations.

The difficulty with accessing the Scala APIs is especially true for accessing functions with class tags or using functionality provided through implicit conversions (such as all of the Double and Tuple specific functionality on RDDs). For functionality that depends on implicit conversions, equivalent classes are often provided along with explicit transformations to these concrete classes. For functions that depend on class tags, “fake” class tags (e.g., AnyRef) can be supplied (and are automatically supplied often by wrappers). Using the concrete class instead of the implicit conversion generally doesn’t add any overhead, but the fake class tags can limit some of the compiler optimizations.

The Java API is kept quite close to the Scala API in terms of features, with only the occasional functionality or Developer API not being available. Support for other JVM languages, like Clojure with Flambo and sparkling, is done using the Java APIs instead of calling the Scala APIs directly. Since most of the language bindings, even non-JVM languages like Python and R, go through the Java APIs, it is useful to understand the Java APIs.

The Java APIs closely resemble the Scala APIs, while avoiding depending on class tags or implicit conversions. The lack of implicit conversions means that rather than automatically converting RDDs containing Tuples or doubles to special classes with additional functions, explicit function conversions must be used (such as mapToDouble and mapToPair). These functions are only defined on Java RDDs; thankfully, for interoperability, these special types are simply wrappers of Scala RDDs. These special functions also return different types, such as JavaDoubleRDD and JavaPairRDD, which have the functionality that is provided by the implicit conversions in Scala.

Let’s revisit the canonical word count example using the Java APIs (Example 7-1). Since it can sometimes be convoluted to call the Scala API from Java, Spark’s Java APIs are mostly implemented in Scala while hiding class tags and implicit conversions. This allows the Java wrappers to be a very thin layer, consisting of only a few lines on average, with very little reimplementation required.

Example 7-1. Java Word count example
import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.regex.Pattern;
import java.util.Arrays;

public final class WordCount {
  private static final Pattern pattern = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    JavaSparkContext jsc = new JavaSparkContext();
    JavaRDD<String> lines = jsc.textFile(args[0]);
    JavaRDD<String> words = lines.flatMap(e -> Arrays.asList(
                                            pattern.split(e)).iterator());
    JavaPairRDD<String, Integer> wordsIntial = words.mapToPair(
      e -> new Tuple2<String, Integer>(e, 1));
  }
}
Tip

Spark supports Java 8 lambdas for most transformations. If you are working with an earlier version of Java you will need to create instances from org.apache.spark.api.java.function.package. The function names are generally similar to the name of the transformation (e.g., FlatMapFunction and DoubleFunction).

Sometimes you may want to convert your Java RDDs to Scala RDDs or vice versa. Most commonly this is for libraries that require or return Scala RDDs, but sometimes core Spark functionality may not yet be available in the Java API and converting your RDD to a Scala RDD is an easy way to access the new functionality.

If you have a Java RDD you want to pass to a Scala library expecting a regular Spark RDD, you can access the underlying Scala RDD with rdd(). Most often this is sufficient to pass the resulting RDD to whichever Scala library you need to call; some notable exceptions are Scala libraries that depend on implicit conversions of the contents of the RDD or class tag information. In this case writing a small wrapper in Scala can be the simplest way to access the implicit conversions. If a Scala shim is out of the question, explicitly calling the corresponding function on the JavaConverters object, construct a fake class tag.

To construct a fake class tag you can use scala.reflect.ClassTag$.MODULE$.AnyRef() or get the actual class tag with scala.reflect.ClassTag$.MODULE$.apply(CLASS) as illustrated in Examples 7-2 and 7-3.

Going from a Scala RDD to a Java RDD often requires class tag information more than most Spark libraries. This is because, while the different JavaRDDs expose public constructors that take Scala RDDs as arguments, these are intended to be called from within Scala and therefore expect class tag information.

Tip

If you are in a mixed language project or library, consider constructing the Java RDD in the Scala where the class tag information is more easily available.

Fake class tags are most commonly used in generic or templated code in which you don’t know the exact types at compile time. Using fake class tags often work, although some specialization may be lost in the Scala side; very occasionally the Scala code depends on correct class tag information. In this case you must use a real class tag. In most cases, using a real class tag is not substantially more effort and can offer performance advantages, so use them when possible.

Example 7-2. Java/Scala RDD interoperability with fake class tag
  public static JavaPairRDD wrapPairRDDFakeCt(
    RDD<Tuple2<String, Object>> rdd) {
    // Construct the class tags by casting AnyRef - this would be more commonly done
    // with generic or templated code where we can't explicitly construct the correct
    // class tag as using fake class tags may result in degraded performance.
    ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef();
    return new JavaPairRDD(rdd, fake, fake);
  }
Example 7-3. Java/Scala RDD interoperability
  public static JavaPairRDD wrapPairRDD(
    RDD<Tuple2<String, Object>> rdd) {
    // Construct the class tags
    ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class);
    ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class);
    return new JavaPairRDD(rdd, strCt, longCt);
  }

Both the Spark SQL and the ML pipeline APIs are mostly unified between Scala and Java. There are still Java-specific helper functions in which the equivalent Scala function is difficult to call. Some examples of this are the various numeric functions, like plus minus, etc., on Column as the overloaded Scala equivalents (+, -) cannot be easily accessed. Rather than having JavaDataFrame and a JavaSQLContext, the methods required for Java access are available on the regular DataFrame and SQLContext. This can be somewhat confusing, as some of the methods that will appear in the JavaDoc may not be usable from Java, but in those cases similarly named functions will be provided to be called from Java.

Java UDFs, and by extension most other non-Scala languages, require specifying the return type of your function as it can’t be inferred in the same way it is done in Scala (Example 7-4).

Example 7-4. Sample Java UDF
    sqlContext.udf()
      .register("strlen",
                (String s) -> s.length(), DataTypes.StringType);

While the types required by the Scala and Java APIs are different, for the most part, the Java collection types can be wrapped without requiring an extra copy. For iterators, the wrap conversion can be done lazily as the elements are accessed, allowing Spark to spill the data as needed (as discussed in “Iterator-to-Iterator Transformations with mapPartitions”). This is especially important since for many simple operations the cost of copying the data can quickly dominate the actual computation required.

Warning

In earlier versions of Spark, the Java API mistakenly required Iterable rather than Iterator, which limited the ability to create iterator-to-iterator transformations in Java.

Beyond Scala, and Beyond the JVM

Going beyond the JVM greatly opens up the scope of different languages available for you to work in. However, in its current architecture, going outside of the JVM in Spark—especially on the workers—can involve a substantial performance cost of copying data on worker nodes between the JVM and the target language. For complex operations the cost of copying the data is relatively low, but for simpler operations the cost of copying the data can easily double the computation cost.

The first non-JVM language to be directly supported inside of Spark is Python, and its API and interface have become a model that other non-JVM languages have based their implementations on.

How PySpark Works

PySpark connects to JVM Spark using a mixture of pipes on the workers and Py4J, a specialized library for Python/Java interoperability, on the driver. This relatively simple architecture hides a large number of complexities involved in making PySpark work, as Figure 7-1 shows. One of the bigger challenges is that even once the data has been copied from the Python worker to the JVM, it isn’t in a form the JVM can easily parse. This requires special handling on both the Python worker and Java to ensure sufficient information for things like partitioning is available in the JVM.

PySpark diagram
Figure 7-1. PySpark diagram
Warning

After the initial reading from persistent storage (like HDFs or S3) and between any shuffle, the data on the workers needs to be passed between the JVM and Python.

Tip

Is IPython your jam? In Spark 2.0+ the old syntax to get an IPython notebook has changed from IPYTHON_OPTS="notebook" to PYSPARK_DRIVER_PYTHON="ipython" PYSPARK_DRIVER_​PYTHON_​OPTS​=​"notebook".

PySpark RDDs

Transferring the data to and from the JVM and starting the Python executor has significant overhead. Using the DataFrame/Dataset API avoids many of the performance challenges with the PySpark RDD API by keeping the data inside the JVM for as long as possible.

Copying the data from the JVM to Python is done using sockets and pickled bytes. A more general version of this, for talking to programs in other languages, is available through the PipedRDD interface illustrated in “Using Pipe and Friends”.

Since piping the data back and forth for each transformation would be expensive, PySpark pipelines Python transformations inside of the Python interpreter when possible, so a filter then a map will be chained together on the iterator of Python objects using a specialized PipelinedRDD. Even when the data has to be shuffled and PySpark is unable to chain our transformations inside of a single worker VM, the Python interpreter is capable of being reused so the interpreter startup overhead doesn’t further slow us down.

This is only part of the puzzle. Normal PipedRDDs work on Strings, which can’t easily be shuffled since there is no inherent key. The approach taken in PySpark, and mirrored in many other language bindings, is a special PairwiseRDD in which the key must be a long and the key only is deserialized with custom Scala code to parse the Python value. This deserialization is not overly expensive, but does serve to illustrate that for the most part, Spark Scala treats the results of Python as opaque bytes arrays.

Warning

Since there is some overhead associated with serialization and deserialization, PySpark uses a batch serializer, and this can occasionally result in unexpected effects (like when repartitioning PySpark will not split up things in the same batch).

For all its simplicity this approach to integrating works surprisingly well, with the majority of operations on Scala RDDs available in Python. Some of the more difficult places are interacting with libraries, such as MLlib, and loading and saving from different sources.

Interacting with different formats is another restriction, as much of Spark’s load/save code is based on Hadoop’s Java interfaces. This means that any data loaded is initially loaded into the JVM and then transferred to Python.

For interacting with MLlib, generally two approaches have been taken: either a specialized data type is used in PySpark with equivalent Scala decoders, or the algorithm is reimplemented in Python. These problems are avoided with Spark ML, which uses the DataFrame/Dataset interface that generally keeps the data stored in the JVM.

PySpark DataFrames and Datasets

DataFrames and Datasets avoid many of the performance downsides of the Python RDD API by keeping the data inside the JVM for as long as possible. The same benchmark we did to illustrate DataFrames’ general improvement over RDDs (Figure 3-1) shows a greater difference when rerun in Python (Figure 7-2).

RDD versus DataFrame performance Python
Figure 7-2. Spark SQL performance in Python

For many operations on DataFrames and Datasets, the data may never actually need to leave the JVM, although using Python UDFs, UDAFs, or lambdas naturally requires transferring some of the data to the JVM. This results in a simplified architecture diagram for many operations, which instead of Figure 7-1, looks like Figure 7-3.

PySpark SQL diagram
Figure 7-3. PySpark SQL diagram
Note

PySpark doesn’t use Jython because it has been found that a lot of Python users need access to libraries, like numpy, scipy, and pandas, which do not work well in Jython.

Tip

Some early work is being investigated to see if Jython can be used to accelerate Python UDFs, which don’t depend on C extensions. See SPARK-15369 for updates.

Accessing the backing Java objects and mixing Scala code

An important implication of the PySpark architecture is that many of Spark’s Python classes simply exist as wrappers to translate your Python calls to the JVM.

If you work with Scala/Java developers and you wish to collaborate, preexisting wrappers won’t exist to call your own code—but you can register Java/Scala UDFs and then use them from Python. Starting in Spark 2.1 this can be done with the registerJavaFunction utility on the sqlContext.

Sometimes these wrappers don’t do everything you need, and since Python doesn’t have strong protections around accessing private methods, you can jump directly into the JVM. The same techniques can be used to call your own JVM code, and with a bit of work translate the results into Python objects.

Warning

While the Py4J API is accessible, these techniques depend on implementation details of PySpark, and these implementation details may change between releases.

Thinking back to “Large Query Plans and Iterative Algorithms”, we suggested that it was important to use the JVM version of DataFrames and RDDs to cut the query plan. This is a workaround for when a query plan becomes too large for the Spark SQL optimizer to process, by putting an RDD in the middle the SQL optimizer can’t see back past the point where the data is in an RDD. While you could accomplish the same thing using public Python APIs, you would lose much of the advantage of DataFrames as the entire data would need to be round-tripped through the Python workers. Instead, by using some of the internal APIs, you can cut the lineage from Python while keeping the data in the JVM (as shown in Example 7-5).

Example 7-5. Cut large DataFrame query plan with Python
def cutLineage(df):
    """
    Cut the lineage of a DataFrame - used for iterative algorithms

    .. Note: This uses internal members and may break between versions
    >>> df = rdd.toDF()
    >>> cutDf = cutLineage(df)
    >>> cutDf.count()
    3
    """
    jRDD = df._jdf.toJavaRDD()
    jSchema = df._jdf.schema()
    jRDD.cache()
    sqlCtx = df.sql_ctx
    try:
        javaSqlCtx = sqlCtx._jsqlContext
    except:
        javaSqlCtx = sqlCtx._ssql_ctx
    newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema)
    newDF = DataFrame(newJavaDF, sqlCtx)
    return newDF

In general, the convention for most python objects is _j[shortname] to access the underlying Java version. So, for example, the SparkContext has _jsc to get at the underling Java SparkContext. This is only available on the driver program, so if any PySpark objects are sent to the workers you won’t be able to access the underlying Java component and large parts of the API will not work.

Note

The Python APIs generally wrap Java versions of the API rather than directly wrapping the Scala versions.

If you want to access a JVM Spark class that does not already have a Python wrapper, you can directly use the Py4J gateway on the driver. The SparkContext contains a reference to the gateway in _gateway. Arbitrary Java objects can be accessed with sc._gateway.jvm.[fulljvmclassname].

Warning

Py4J depends heavily on reflection to determine which methods to call. This is normally not a problem, but can become confusing with numeric types. Attempting to call a Scala function expecting a Long with an Integer will result in an error message about not being able to find the method, even though in Python the distinction normally would not matter.

The same technique works for your own Scala classes provided they are on the class path. You can add JARs to the class path with spark-submit with --jars or by setting the spark.driver.extraClassPath configuration property. Example 7-6, which we used to generate Figure 7-2, is intentionally structured to use the existing Scala code to generate the performance testing data.

Example 7-6. Calling non-Spark JVM classes with Py4J
    sc = sqlCtx._sc
    # Get the SQL Context, 2.1, 2.0 and pre-2.0 syntax - yay internals :p
    try:
        try:
            javaSqlCtx = sqlCtx._jsqlContext
        except:
            javaSqlCtx = sqlCtx._ssql_ctx
    except:
        javaSqlCtx = sqlCtx._jwrapped
    jsc = sc._jsc
    scalasc = jsc.sc()
    gateway = sc._gateway
    # Call a java method that gives us back an RDD of JVM Rows (Int, Double)
    # While Python RDDs are wrapped Java RDDs (even of Rows) the contents are
    # different, so we can't directly wrap this.
    # This returns a Java RDD of Rows - normally it would better to
    # return a DataFrame directly, but for illustration we will work
    # with an RDD of Rows.
    java_rdd = (gateway.jvm.com.highperformancespark.examples.
                tools.GenerateScalingData.
                generateMiniScaleRows(scalasc, rows, numCols))
    # Schemas are serialized to JSON and sent back and forth
    # Construct a Python Schema and turn it into a Java Schema
    schema = StructType([
        StructField("zip", IntegerType()),
        StructField("fuzzyness", DoubleType())])
    # 2.1 / pre-2.1
    try:
        jschema = javaSqlCtx.parseDataType(schema.json())
    except:
        jschema = sqlCtx._jsparkSession.parseDataType(schema.json())
    # Convert the Java RDD to Java DataFrame
    java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema)
    # Wrap the Java DataFrame into a Python DataFrame
    python_dataframe = DataFrame(java_dataframe, sqlCtx)
    # Convert the Python DataFrame into an RDD
    pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1]))
    return (python_dataframe, pairRDD)
Warning

Attempting to use the Py4J bridge inside of your transformations will fail at runtime.

While many of the Python classes are simply wrappers of Java objects, not all Java objects can directly be wrapped into Python objects and then used in Spark. For example, objects in PySpark RDDs are represented as pickled strings, which can only be easily parsed in Python. Thankfully, DataFrames are standardized between the languages, so provided you can convert your data into a DataFrame, you can then wrap it in Python and use it directly as a Python DataFrame or convert the Python DataFrame to a Python RDD.

Tip

Scala UDFs and UDAFs can be used from Python without having to go through the Py4J API.

PySpark dependency management

Often a large part of the reason one wants to use a language other than Scala is for the libraries that are available with that language. In addition to language-specific libraries, you may need to include libraries for Spark itself to use, especially when working with different data formats. There are a few different options for using both Spark-specific and language-specific libraries in PySpark.

Spark Packages is a system that allows us to easily include JVM dependencies with Spark. A common reason for wanting additional JVM libraries in PySpark is support for additional data formats.

If you are working in the Scala shell you can use the --packages command-line argument to specify the Maven coordinates of a package you want in the shell. If you are building a Scala package you also add any requirements to your assembly .jar.

For Python, you can create a Java or Scala project with your JVM dependencies and add the .jar with --jar. If you’re working in the PySpark shell command-line arguments aren’t allowed, so you can instead specify the spark.jars.packages configuration variable.

When using Spark Packages the dependencies are automatically fetched from Maven and distributed to the cluster. If your JVM dependency is not available in Maven, you can use the same technique we discuss next for adding local Python dependencies.

Adding local dependencies with PySpark can be done at both job submission time and dynamically using the SparkContext. Local dependencies can be .jar files, for JVM requirements, or .zip and .egg for Python dependencies, which are automatically added to the PYTHONPATH.

Note

There is currently work under way to allow Python Spark programs to specify required pip packages and have them auto installed, but the proposal has not yet been accepted. See the pull request and SPARK-5929 for the status of this proposal.

For individuals working with a CDH cluster, it is now possible to easily add packages with Anaconda. Cloudera’s post Making Python on Apache Hadoop Easier details how to install the packages on your cluster. To make the resulting packages accessible to Apache Spark, all you need to do is set the shell environment variable PYSPARK_PYTHON to /opt/cloudera/parcels/Anaconda/bin/python either with export in your shell profile or in your spark-env.sh file.

If none of the above work for your cluster configuration there are a few remaining options, all of which are somewhat less than ideal. The simplest, but very hacky, approach is to simply have your transformations explicitly import the package and on failure, perform a pip installation. Similar approaches can be done with broadcast variables or a setup map at the start of the program. Failing that you can ask your cluster administrator to install the package systemwide with parallel-ssh or similar, as shown in Example 7-7.

Example 7-7. Parallel ssh install pip packages
parallel-ssh pip install -h ./conf/slaves

Installing PySpark

First-party languages for Spark don’t require any separate installation, but as mentioned for Python packages, Python has its own mechanisms for dealing with package management.

Installation with pip was added in PySpark version 2.1, and at that point you can download the PySpark package from the Apache download mirror and run pip install pyspark-2.1.0.tar.gz, allowing virtualenv support as well. PySpark 2.2.0 (and forward) are directly published PyPi allowing for an even simpler pip install pyspark. Once you have PySpark pip installed you can then start your favorite Python interpreter and import pyspark like any other package or start the PySpark shell with pyspark.

It’s important to note that pip installing Spark is optional. If you wish you can run PySpark from a regular Spark setup without pip installation (although then you must use spark-submit or pyspark from the Spark bin directory).

How SparkR Works

SparkR takes a similar approach to PySpark, but does not currently expose the ability to perform arbitrary R code in the workers. While a similar PipedRDD wrapper exists for R as it does for Python, it is kept internal and the only public interface for working with R is through DataFrames.

Warning

Of the directly supported languages, SparkR is the furthest away from Scala Spark in terms of feature completeness. This gap will likely close over time, but be careful when selecting SparkR to ensure it has the features you need. The API documentation will give you an idea if what you are looking for is already available.

To give you an idea of what the SparkR interface looks like, the standard word count example has been rewritten in R in Example 7-8.

Example 7-8. SparkR word count
library(SparkR)

# Setup SparkContext & SQLContext
sc <- sparkR.init(appName="high-performance-spark-wordcount-example")

# Initialize SQLContext
sqlContext <- sparkRSQL.init(sc)

# Load some simple data

df <- read.text(fileName)

# Split the words
words <- selectExpr(df, "split(value, " ") as words")

# Compute the count
explodedWords <- select(words, alias(explode(words$words), "words"))
wc <- agg(groupBy(explodedWords, "words"), "words" = "count")


# Attempting to push an array back fails
# resultingSchema <- structType(structField("words", "array<string>"))
# words <- dapply(df, function(line) {
#   y <- list()
#   y[[1]] <- strsplit(line[[1]], " ")
# }, resultingSchema)
# Also attempting even the identity transformation on a DF from read.text fails
# in Spark 2.0-preview (although works fine on other DFs).

# Display the result
showDF(wc)

To execute your own custom R code you can use the dapply method on DataFrames as illustrated in Example 7-9. SparkR’s custom code execution support has a long way to go, as illustrated by the difficulty of attempting to perform a word count with dapply in Example 7-8.

Example 7-9. SparkR arbitrary code with DataFrames
library(SparkR)

# Setup SparkContext & SQLContext
sc <- sparkR.init(appName="high-performance-spark-wordcount-example")

# Initialize SQLContext
sqlContext <- sparkRSQL.init(sc)


# Count the number of characters - note this fails on the text DF due to a bug.
df <- createDataFrame (sqlContext,
  list(list(1L, 1, "1"),
  list(2L, 2, "22"),
  list(3L, 3, "333")),
  c("a", "b", "c"))
resultingSchema <- structType(structField("length", "integer"))
result <- dapply(df, function(row) {
  y <- list()
  y <- cbind(y, nchar(row[[3]]))
}, resultingSchema)
showDF(result)

Internally dapply is implemented in a similar way to Python’s UDF support, but since the RDD API isn’t exposed it leaves more potential for future optimizations and encourages development with the more optimized DataFrame APIs.

Warning

As with PySpark, arbitrary non-JVM code execution is slower than traditional Scala Spark code.

Tip

SparkR isn’t the only interface for running Spark and R together. Sparklyr is a 3rd party library, from R Studio, which is also quite popular. From a performance point of view, it shares the same underlying mechanisms as SparkR in interfacing with the JVM.

Spark.jl (Julia Spark)

Spark.jl is one of the newer projects to provide bindings for Spark and as such does not yet have a fully functional subset of the API supported. Spark.jl is incredibly easy to install (see Example 7-10), and it automatically installs a supported version of Spark along side it. The general design of Spark.jl is similar to that of PySpark, with a custom implementation of the PipedRDD that is able to parse limited amounts of serialized data from Julia implemented inside of the JVM. The same general performance caveats of using PySpark also apply to Spark.jl.

Example 7-10. Julia Spark install
Pkg.clone("https://github.com/dfdx/Spark.jl")
Pkg.build("Spark")
# we also need latest master of JavaCall.jl
Pkg.checkout("JavaCall")
Warning

As of this writing, named Julia functions cannot be fully serialized, so functions used inside of transformations should be anonymous.

Until keyed operations are supported in Spark.jl we can’t even build the simple word count example. Namely, reduceByKey is missing, which is required for shuffling, and while others like flatMap are missing it can be replaced with mapPartitions. For now Spark.jl is an early stage project that shows promise but is not ready for use.

How Eclair JS Works

Eclair JS takes a different approach than R and Python support, mostly staying inside the JVM except for the driver program. Eclair JS runs JavaScript in both the JVM and V8 JavaScript engine, with the functions inside of the transformations being evaluated by the JVM using Nashorn. The split between driver-side and worker-side evaluation allows for fast integration on the workers and NodeJS bindings on the driver. See Figure 7-4 for a diagram of this.

Eclair JS design
Figure 7-4. Eclair JS diagram

This somewhat unorthodox approach means that certain library functions may not be available inside of the transformations, but saves us from the double serialization problem found in PySpark and SparkR UDFs. The node driver communicates using Apache Toree to send the required functions to the JVM, which then sends them to the workers.

Installing Eclair JS is easy relative to other languages as the worker side is able to run without any extra packages. The getting started guide walks you through the setup process.

Warning

While Eclair JS presents some interesting novel ideas, it has been deprecated.

Spark on the Common Language Runtime (CLR)—C# and Friends

Microsoft’s Mobius project provides C# bindings for working with Apache Spark. The general design is similar to that of PySpark, with the internals of PythonRDD instead communicating with the CLR. As with PySpark, RDD transformations involve copying the data from the JVM, and DataFrame transformations that don’t use UDFs in C# don’t require copying the data on the workers (or even launching the CLR). If you are curious about using Mobius you can check out the design documents and examples.

Calling Other Languages from Spark

In addition to using other languages to call Spark, we can call other languages from Spark.

Using Pipe and Friends

If there aren’t existing wrappers for the language you are working with, one of the simplest options is using Spark’s pipe interface. To use the pipe interface you start by converting your RDDs into a format in which they can be sent over a Unix pipe. Often simple formats like JSON or CSV are used for communicating, as lightweight libraries exist for generating and parsing these records in many languages.

Let’s return to the Goldilocks example from “The Goldilocks Example”. Suppose that in addition to optimal panda porridge temperature, you also wanted to find out which pandas had been commenting on Spark PRs;4 you might cook up a quick little Perl script, as in Example 7-11. Later on, if you want to use this script in Spark you can use the pipe command to call your Perl script from the workers. Since pipe only works with strings, you will need to format your inputs as a string and parse the result string back into the correct data type, as in Example 7-12.

Example 7-11. Perl script to be called from pipe
#!/usr/bin/perl
use strict;
use warnings;

use Pithub;
use Data::Dumper;

# Find all of the commentors on an issue
my $user = $ENV{'user'};
my $repo = $ENV{'repo'};
my $p = Pithub->new(user => $user, repo => $repo);
while (my $id = <>) {
    chomp ($id);
    my $issue_comments = $p->issues->comments->list(issue_id => $id);
    print $id;
    while (my $comment = $issue_comments->next) {
	print " ".$comment->{"user"}->{"login"};
    }
    print "
";
}
Example 7-12. Using pipe (from Scala Spark) to talk to a Perl program on the workers
  def lookupUserPRS(sc: SparkContext, input: RDD[Int]): RDD[(Int, List[String])] = {
    // Copy our script to the worker nodes with sc.addFile
    // Add file requires absolute paths
    val distScriptName = "ghinfo.pl"
    val userDir = System.getProperty("user.dir")
    val localScript = s"${userDir}/src/main/perl/${distScriptName}"
    val addedFile = sc.addFile(localScript)

    // Pass enviroment variables to our worker
    val enviromentVars = Map("user" -> "apache", "repo" -> "spark")
    val result = input.map(x => x.toString)
      .pipe(SparkFiles.get(distScriptName), enviromentVars)
    // Parse the results
    result.map{record =>
      val elems: Array[String] = record.split(" ")
      (elems(0).toInt, elems.slice(1, elems.size).sorted.distinct.toList)
    }
  }
Tip

Spark will not automatically copy your script to the worker machines, so if you are calling a custom program you can use the sc.addFile interface as in Example 7-12. Otherwise (e.g., if you are calling a systemwide program), just skip that part.

Note

PySpark and SparkR both use specialized version of the Piped RDDs for communication on the workers.

Warning

Make sure that you handle empty partitions, since your program will be called even for empty partitions (although this functionality may change in future versions).

JNI

The Java Native Interface (JNI) is another option for interfacing with other languages. JNI can work well for calling certain C/C++ libraries, as well as other statically compiled languages like FORTRAN. While JNI doesn’t exactly suffer from double serialization in the same way calling PySpark or using pipe does, you still need to copy your data out of the JVM and back.

Tip

This is why some libraries, such as JBLAS, implement some components inside of the JVM, since once copy cost is added, the performance benefit of native code can go away.

To illustrate how to use JNI with Spark, consider calling a very simple C function that sums all of the nonzero inputs. Its function signature is shown in Example 7-13.

Example 7-13. Simple C header
#ifndef _SUM_H
#define _SUM_H

int sum(int input[], int num_elem);

#endif /* _SUM_H */

You can write the JNI specification to call this in either Java (Example 7-14) or Scala (Example 7-15). Although the tooling for Java can be a bit simpler, there is no significant difference between them.

Example 7-14. Simple Java JNI
class SumJNIJava {
  public static native Integer sum(Integer[] array);
}
Example 7-15. Simple Scala JNI
class SumJNI {
  @native def sum(n: Array[Int]): Int
}
Tip

Manually writing wrappers takes effort. Check out SWIG to automatically generate parts of your bindings.

Once you have your C function and your JNI class specification, you need to generate your class files and from them generate the binder heading (see Example 7-16). The javah command will take the class files and generate headers that is then used to create a C-side wrapper.

Example 7-16. Generate header with the command-line interface
javah -classpath ./target/examples-0.0.1.jar 
com.highperformancespark.examples.ffi.SumJNI

For those of you building with SBT, Jakob Odersky’s sbt-jni package makes it easy to integrate your native code with your Scala project. sbt-jni is published as an SBT plug-in like spark-packages-sbt, and is included by adding an entry to project/plugins.sbt as shown in Example 7-17.

Example 7-17. Add sbt-jni plug-in to project/plugins.sbt
addSbtPlugin("ch.jodersky" %% "sbt-jni" % "1.0.0-RC3")

sbt-jni simplifies generating the header file by adding the javah target to sbt, which will generate the header files and place them in ./target/native/include/.

Once we have our header file we need to write a wrapper in C. The generated header file shouldn’t be modified, but rather imported into our shim as shown in Example 7-18.

Example 7-18. JNI C shim
#include "sum.h"
#include "include/com_highperformancespark_examples_ffi_SumJNI.h"
#include <ctype.h>
#include <jni.h>

/*
 * Class:     com_highperformancespark_examples_ffi_SumJNI
 * Method:    sum
 * Signature: ([I)I
 */
JNIEXPORT jint JNICALL Java_com_highperformancespark_examples_ffi_SumJNI_sum
(JNIEnv *env, jobject obj, jintArray ja) {
  jsize size = (*env)->GetArrayLength(env, ja);
  jint *a = (*env)->GetIntArrayElements(env, ja, 0);
  return sum(a, size);
}

sbt-jni also simplifies building and packaging native code, adding nativeCompile, javah, and packageBin to allow you to easily build an assembly JAR with both your native files and Java artifacts. For sbit-jni to build your native code (in addition to the JVM code) as well, you need to provide a Makefile. If you are starting with a new project, nativeInit CMake target will generate a skeleton CMakeLists.txt file you can use as a basis for your native build.

Tip

In our example project, we’ve built the native code along with the Scala code. Alternatively, especially if you plan to support multiple architectures, you may wish to create a separate package for your native code.

If your artifact is built with sbt-jni you can use the nativeLoader decorator from ch.jodersky.jni.nativeLoader to automatically load your native code as needed. In the example we’ve been working on, our library is called libhigh-performance-spark0 so we can have it automatically loaded by adding the decorator to our SumJNI class, as in Example 7-19.

Example 7-19. Native Loader decorator
@nativeLoader("high-performance-spark0")

If you are working in Java, or just want more control, you can use System.loadLibrary, which takes a library name and searches java.library.path or System.load with an absolute path.

Tip

Leave off the “lib” prefix, which loadLibrary (and sbt-jni) automatically append, or you will get confusing runtime linking errors.

Tip

The Oracle JNI specification can be a useful reference.

Warning

If your native library likely isn’t packaged in your JAR, you need to make sure the JVM running the Spark worker is able to call it. If your library is already installed on the workers you can add -Djava.library.path=... to your spark.executor.extraJavaOptions.

Java Native Access (JNA)

Java Native Access (JNA) is a community-driven alternative to JNI to allow calling of native code, ideally without all of the boilerplate required by JNI. Although JNA is a community package this does not mean it is low quality; it is used by a variety of mature projects and has been used by Spark application developers. We can use JNA to call our previous example in both Scala (Example 7-20) and Java.

Example 7-20. Scala simple JNA
import com.sun.jna._
object SumJNA {
  Native.register("high-performance-spark0")
  @native def sum(n: Array[Int], size: Int): Int
}

It’s important to note that these JNA examples skip the requirement for writing the JNI wrapper (as in Example 7-18) and instead directly call the C function for us. While SWIG can do a good job of generating much of the JNI wrappers, for some this is a compelling reason to use JNA over JNI.

Tip

When using JNA, jna.boot.library.path allows you to add libraries to the search path before the system library path.

Underneath Everything Is FORTRAN

A surprising number of numeric computing libraries still have FORTRAN implementations. Thankfully many of these libraries already have Java or Python wrappers, which greatly simplify our access. These libraries often can make intelligent decisions about what operations are worth the overhead of copying our data into FORTRAN and what operations make more sense to be implemented in the host language. Not all FORTRAN code already has wrappers, and you may find yourself in a place with which you want to interface.

The general process is to first create a C/C++ wrapper that exposes the FORTRAN code for Java to call, and then link the C/C++ code together with the FORTRAN code. Continuing the sum example in FORTRAN (Example 7-21), you would create a C wrapper like Example 7-22, and then follow the existing steps for calling a C library in “JNI”.

Example 7-21. FORTRAN sum function
       INTEGER FUNCTION SUMF(N,A) BIND(C, NAME='sumf')
       INTEGER A(N)
       SUMF=SUM(A)
       END
Example 7-22. C wrapper for FORTRAN sum function
// Fortran routine
extern int sumf(int *, int[]);

// Call the fortran code which expects by reference size
int wrap_sum(int input[], int size) {
  return sumf(&size, input);
}
Tip

If you like sbt-jni you can extend the generated CMake file to also compile your FORTRAN code.

These wrappers can also be automatically generated with programs like fortrwrap, or skipped entirely with JNA. Calling the FORTRAN function with JNA is very similar to calling the C function, as shown in Example 7-23.

Example 7-23. FORTRAN SUMF through JNA
import com.sun.jna._
import com.sun.jna.ptr._
object SumFJNA {
  Native.register("high-performance-spark0")
  @native def sumf(n: IntByReference, a: Array[Int]): Int
  def easySum(size: Int, a: Array[Int]): Int = {
    val ns = new IntByReference(size)
    sumf(ns, a)
  }
}

Calling FORTRAN code from the JVM is more difficult than calling C code. If available, it’s often better to use existing wrappers as they can make intelligent decisions about which components to execute in FORTRAN rather than in the JVM.

Getting to the GPU

GPUs are another great way of working with parallel, numeric computing problems. They have been shown to be particularly effective at certain types of machine learning problems. Some single-node distributed systems exist just to coordinate the work of multiple GPUs. If your problem is well suited to GPU acceleration, the performance improvement can be huge (SparkGPULR showed a 3× improvement).

The GPUEnabler Spark package exists to simplify interfacing Spark with CUDA. The package simplifies the setup of JCUDA and automates converting your data into a columnar format for working on GPUs.

Tip

Some people have also used aparapi to automate compilation of Java code to OpenCL, although no packages exist to simplify the integration currently.

At present there is no unified way inside of Apache Spark to perform GPU acceleration, with competing proposals from IBM (spark-gpu), Adobe (spark-gpu), and others.

Tip

For those interested you may wish to follow SPARK-12620 and friends.

The Future

Tungsten has the ability to store data off-heap with Spark, but the data format is currently not stable or sufficiently documented to enable shared access from other languages. Two possibilities exist to improve this: either the standardization of Tungsten, SPARK-9697, or the integration of Arrow in Python and Spark, SPARK-13534. Hopefully future editions of this book will be able to report the awesomeness that these changes have enabled.

Conclusion

Writing high-performance Spark code need not be limited to Scala, let alone the JVM (although it can certainly make things easier). Spark has a wide variety of language bindings, both built-in and third party, and can interface with even more languages using JNI, JNA, pipes, or sockets. For some operations, the cost of copying the data outside of the JVM and back can be more expensive than just doing the operation in the JVM—even with specialized libraries—so it is important to consider the complexity of your transformations before going outside of the JVM. While not currently supported, Tungsten’s off-heap support may eventually standardize in such a way as to better support language interoperability on the workers.

1 There are multiple competing R APIs, but for the purposes of performance they share the same underlying design.

2 Just because support is first party does not mean it will be fast; in some cases third-party bindings have taken interesting work to minimize overhead that has not been implemented in the first-party languages.

3 CUDA is a specialized language for parallel GPU programming from NVIDIA.

4 This is somewhat of a stretch as far as the relationship to Goldilocks goes, but you know.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset