Spark start and data load

Now it's time to fire up a Spark cluster which will give us all the functionality of Spark while simultaneously allowing us to use H2O algorithms and visualize our data. As always, we must download Spark 2.1 distribution from http://spark.apache.org/downloads.html and declare the execution environment beforehand. For example, if you download spark-2.1.1-bin-hadoop2.6.tgz from the Spark download page, you can prepare the environment in the following way:

tar -xvf spark-2.1.1-bin-hadoop2.6.tgz 
export SPARK_HOME="$(pwd)/spark-2.1.1-bin-hadoop2.6 

When the environment is ready, we can start the interactive Spark shell with Sparkling Water packages and this book package:

export SPARKLING_WATER_VERSION="2.1.12"
export SPARK_PACKAGES=
"ai.h2o:sparkling-water-core_2.11:${SPARKLING_WATER_VERSION},
ai.h2o:sparkling-water-repl_2.11:${SPARKLING_WATER_VERSION},
ai.h2o:sparkling-water-ml_2.11:${SPARKLING_WATER_VERSION},
com.packtpub:mastering-ml-w-spark-utils:1.0.0"
    
    
$SPARK_HOME/bin/spark-shell       
    
            --master 'local[*]' 
            --driver-memory 4g 
            --executor-memory 4g 
            --packages "$SPARK_PACKAGES"
  
H2O.ai is constantly keeping up with the latest releases of the Spark project to match the version of Sparkling Water. The book is using Spark 2.1.1 distribution and Sparkling Water 2.1.12. You can find the latest version of Sparkling Water for your version of Spark at http://h2o.ai/download/

This case is using the provided Spark shell which downloads and uses Spark packages of Sparkling Water version 2.1.12. The packages are identified by Maven coordinates - in this case ai.h2o represents organization ID, sparkling-water-core identifies Sparkling Water implementation (for Scala 2.11, since Scala versions are not binary compatible), and, finally, 2.1.12 is a version of the package. Furthermore, we are using this book -specific package which provides handful utilities.

The list of all published Sparkling Water versions is also available on Maven central: http://search.maven.org

The command starts Spark in a local mode - that is, the Spark cluster has a single node running on your computer. Assuming you did all this successfully, you should see the standard Spark shell output like this:

Figure 2 - Notice how the shell starts up showing you the version of Spark you are using.
The provided book source code provides for each chapter the command starting the Spark environment; for this chapter, you can find it in the chapter2/bin folder.

The Spark shell is a Scala - based console application that accepts Scala code and executes it in an interactive way. The next step is to prepare the computation environment by importing packages which we are going to use during our example.

import org.apache.spark.mllib 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg._ 
import org.apache.spark.mllib.linalg.distributed.RowMatrix 
import org.apache.spark.mllib.util.MLUtils 
import org.apache.spark.mllib.evaluation._ 
import org.apache.spark.mllib.tree._ 
import org.apache.spark.mllib.tree.model._ 
import org.apache.spark.rdd._ 

Let's first ingest the .csv file that you should have downloaded and do a quick count to see how much data is in our subset. Here, please notice, that the code expects the data folder "data" relative to the current process working directory or location specified:

val rawData = sc.textFile(s"${sys.env.get("DATADIR").getOrElse("data")}/higgs100k.csv")
println(s"Number of rows: ${rawData.count}")

The output is as follows:

You can observe that execution of the command sc.textFile(...) took no time and returned instantly, while executing rawData.count took the majority amount of time. This exactly demonstrates the difference between Spark transformations and actions. By design, Spark adopts lazy evaluation - it means that if a transformation is invoked, Spark just records it directly into its so-called execution graph/plan. That perfectly fits into the big data world, since users can pile up transformations without waiting. On the other hand, an action evaluates the execution graph - Spark instantiates each recorded transformation and applies it onto the output of previous transformations. This concept also helps Spark to analyze and optimize an execution graph before its execution - for example, Spark can reorganize the order of transformations or can decide to run transformations in parallel if they are independent.

Right now, we defined a transformation which loads data into the Spark data structure RDD[String] which contains all the lines of input data file. So, let's look at the first two rows:

rawData.take(2) 

The first two lines contain raw data as loaded from the file. You can see that a row is composed of a response column having the value 0,1 (the first value of the row) and other columns having real values. However, the lines are still represented as strings and require parsing and transformation into regular rows. Hence, based on the knowledge of the input data format, we can define a simple parser which splits an input line into numbers based on a comma:

val data = rawData.map(line => line.split(',').map(_.toDouble)) 
 

Now we can extract a response column (the first column in the dataset) and the rest of data representing the input features:

val response: RDD[Int] = data.map(row => row(0).toInt)   
val features: RDD[Vector] = data.map(line => Vectors.dense(line.slice(1, line.size))) 

After this transformation, we have two RDDs:

  • One representing the response column
  • Another which contains dense vectors of numbers holding individual input features

Next, let's look in more detail at the input features and perform some very rudimentary data analysis:

 

val featuresMatrix = new RowMatrix(features) 
val featuresSummary = featuresMatrix.computeColumnSummaryStatistics() 

We converted this vector into a distributed RowMatrix. This gives us the ability to perform simple summary statistics (for example, compute mean, variance, and so on:)

 

 
import org.apache.spark.utils.Tabulizer._ 
println(s"Higgs Features Mean Values = ${table(featuresSummary.mean, 8)}")
 

The output is as follows:

Take a look at following code:

println(s"Higgs Features Variance Values = ${table(featuresSummary.variance, 8)}") 
  

The output is as follows:

In the next step, let's explore columns in more details. We can get directly the number of non-zeros in each column to figure out if the data is dense or sparse. Dense data contains mostly non-zeros, sparse data the opposite. The ratio between the number of non-zeros in the data and the number of all values represents the sparsity of data. The sparsity can drive our selection of the computation method, since for sparse data it is more efficient to iterate over non-zeros only:

val nonZeros = featuresSummary.numNonzeros 
println(s"Non-zero values count per column: ${table(nonZeros, cols = 8, format = "%.0f")}") 

The output is as follows:

However, the call just gives us the number of non-zeros for all column, which is not so interesting. We are more curious about columns that contain some zero values:

val numRows = featuresMatrix.numRows
val numCols = featuresMatrix.numCols
val colsWithZeros = nonZeros
.toArray
.zipWithIndex
.filter { case (rows, idx) => rows != numRows }
println(s"Columns with zeros: ${table(Seq("#zeros", "column"), colsWithZeros, Map.empty[Int, String])}")

In this case, we augmented the original vector of non-zeros by the index of each value and then filter out all the values which are equal to the number of rows in the original matrix. And we get:

We can see that columns 8, 12, 16, and 20 contain some zero numbers, but still not enough to consider the matrix as sparse. To confirm our observation, we can compute the overall sparsity of the matrix (remainder: the matrix does not include the response column):

val sparsity = nonZeros.toArray.sum / (numRows * numCols)
println(f"Data sparsity: ${sparsity}%.2f")

The output is as follows:

And the computed number confirms our former observation - the input matrix is dense.

Now it is time to explore the response column in more detail. As the first step, we verify that the response contains only the values 0 and 1 by computing the unique values inside the response vector:

val responseValues = response.distinct.collect
println(s"Response values: ${responseValues.mkString(", ")}")

The next step is to explore the distribution of labels in the response vector. We can compute the rate directly via Spark:

val responseDistribution = response.map(v => (v,1)).countByKey
println(s"Response distribution: ${table(responseDistribution)}")

The output is as follows:

In this step, we simply transform each row into a tuple representing the row value and 1 expressing that the value occurs once in the row. Having RDDs of pairs, the Spark method countByKey aggregates pairs by a key and gives us a summary of the keys count. It shows that the data surprisingly contains slightly more cases which do represent Higgs-Boson but we can still consider the response nicely balanced.

We can also explore labels distribution visually with help of the H2O library. For that we need to start H2O services represented by H2OContext:

import org.apache.spark.h2o._ 
val h2oContext = H2OContext.getOrCreate(sc) 
 

The code initializes the H2O library and starts H2O services on each node of the Spark clusters. It also exposes an interactive environment called Flow, which is useful for data exploration and model building. In the console, h2oContext prints the location of the exposed UI:

h2oContext: org.apache.spark.h2o.H2OContext =  
Sparkling Water Context: 
 * H2O name: sparkling-water-user-303296214 
 * number of executors: 1 
 * list of used executors: 
  (executorId, host, port) 
  ------------------------ 
  (driver,192.168.1.65,54321) 
  ------------------------ 
  Open H2O Flow in browser: http://192.168.1.65:54321 (CMD + click in Mac OSX) 

Now we can directly open the Flow UI address and start exploring the data. However, before doing that, we need to publish the Spark data as an H2O frame called response:

val h2oResponse = h2oContext.asH2OFrame(response, "response")
If you import implicit conversions exposed by H2OContext, you will be able to invoke transformation transparently based on the defined type on the left-side of assignment:

For example:

import h2oContext.implicits._ 
val h2oResponse: H2OFrame = response 

Now it is time to open the Flow UI. You can open it directly by accessing the URL reported by H2OContext or by typing h2oContext.openFlow in the Spark shell.

Figure 3 - Interactive Flow UI

The Flow UI allows for interactive work with the stored data. Let,s look at which data is exposed for the Flow by typing getFrames into the highlighted cell:

Figure 4 - Get list of available H2O frames

By clicking on the response field or typing getColumnSummary "response", "values", we can get visual confirmation about the distribution of values in the response column and see that the problem is slightly imbalanced:

Figure 5 - Statistical properties of column named "response".
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset