Chapter 4. Supervised and Unsupervised Learning

I covered the basics of the MLlib library in the previous chapter, but MLlib, at least at the time of writing this book, is more like a fast-moving target that is gaining the lead rather than a well-structured implementation that everyone uses in production or even has a consistent and tested documentation. In this situation, as people say, rather than giving you the fish, I will try to focus on well-established concepts behind the libraries and teach the process of fishing in this book in order to avoid the need to drastically modify the chapters with each new MLlib release. For better or worse, this increasingly seems to be a skill that a data scientist needs to possess.

Statistics and machine learning inherently deal with uncertainty, due to one or another reason we covered in Chapter 2, Data Pipelines and Modeling. While some datasets might be completely random, the goal here is to find trends, structure, and patterns beyond what a random number generator will provide you. The fundamental value of ML is that we can generalize these patterns and improve on at least some metrics. Let's see what basic tools are available within Scala/Spark.

In this chapter, I am covering supervised and unsupervised leaning, the two historically different approaches. Supervised learning is traditionally used when we have a specific goal to predict a label, or a specific attribute of a dataset. Unsupervised learning can be used to understand internal structure and dependencies between any attributes of a dataset, and is often used to group the records or attributes in meaningful clusters. In practice, both methods can be used to complement and aid each other.

In this chapter, we will cover the following topics:

  • Learning standard models for supervised learning – decision trees and logistic regression
  • Discussing the staple of unsupervised learning – k-means clustering and its derivatives
  • Understanding metrics and methods to evaluate the effectiveness of the above algorithms
  • Having a glimpse of extending the above methods on special cases of streaming data, sparse data, and non-structured data

Records and supervised learning

For the purpose of this chapter, a record is an observation or measurement of one or several attributes. We assume that the observations might contain noise Records and supervised learning (or be inaccurate for one or other reason):

Records and supervised learning

While we believe that there is some pattern or correlation between the attributes, the one that we are after and want to uncover, the noise is uncorrelated across either the attributes or the records. In statistical terms, we say that the values for each record are drawn from the same distribution and are independent (or i.i.d. in statistical terms). The order of records does not matter. One of the attributes, usually the first, might be designated to be the label.

Supervised learning is when the goal is to predict the label yi:

Records and supervised learning

Here, N is the number of remaining attributes. In other words, the goal is to generalize the patterns so that we can predict the label by just knowing the other attributes, whether because we cannot physically get the measurement or just want to explore the structure of the dataset without having the immediate goal to predict the label.

The unsupervised learning is when we don't use the label—we just try to explore the structure and correlations to understand the dataset to, potentially, predict the label better. The number of problems in this latter category has increased recently with the emergence of learning for unstructured data and streams, each of which, I'll be covering later in the book in separate chapters.

Iris dataset

I will demonstrate the concept of records and labels based on one of the most famous datasets in machine learning, the Iris dataset (https://archive.ics.uci.edu/ml/datasets/Iris). The Iris dataset contains 50 records for each of the three types of Iris flower, 150 lines of total five fields. Each line is a measurement of the following:

  • Sepal length in cm
  • Sepal width in cm
  • Petal length in cm
  • Petal width in cm

With the final field being the type of the flower (setosa, versicolor, or virginica). The classic problem is to predict the label, which, in this case, is a categorical attribute with three possible values as a function of the first four attributes:

Iris dataset

One option would be to draw a plane in the four-dimensional space that separates all four labels. Unfortunately, as one can find out, while one of the classes is clearly separable, the remaining two are not, as shown in the following multidimensional scatterplot (we have used Data Desk software to create it):

Iris dataset

Figure 04-1. The Iris dataset as a three-dimensional plot. The Iris setosa records, shown by crosses, can be separated from the other two types based on petal length and width.

The colors and shapes are assigned according to the following table:

Label

Color

Shape

Iris setosa

Blue

x

Iris versicolor

Green

Vertical bar

Iris virginica

Purple

Horizontal bar

The Iris setosa is separable because it happens to have a very short petal length and width compared to the two other types.

Let's see how we can use MLlib to find that separating multidimensional plane.

Labeled point

The labeled datasets used to have a very special place in ML—we will discuss unsupervised learning later in the chapter, where we do not need a label, so MLlib has a special data type to represent a record with a org.apache.spark.mllib.regression.LabeledPoint label (refer to https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point). To read the Iris dataset from a text file, we need to transform the original UCI repository file into the so-called LIBSVM text format. While there are plenty of converters from CSV to LIBSVM format, I'd like to use a simple AWK script to do the job:

awk -F, '/setosa/ {print "0 1:"$1" 2:"$2" 3:"$3" 4:"$4;}; /versicolor/ {print "1 1:"$1" 2:"$2" 3:"$3" 4:"$4;}; /virginica/ {print "1 1:"$1" 2:"$2" 3:"$3" 4:"$4;};' iris.csv > iris-libsvm.txt

Note

Why do we need LIBSVM format?

LIBSVM is the format that many libraries use. First, LIBSVM takes only continuous attributes. While a lot of datasets in the real world contain discrete or categorical attributes, internally they are always converted to a numerical representation for efficiency reasons, even if the L1 or L2 metrics on the resulting numerical attribute does not make much sense in the unordered discrete values. Second, the LIBSVM format allows for efficient sparse data representation. While the Iris dataset is not sparse, almost all of the modern big data sources are sparse, and the format allows for efficient storage by only storing the provided values. Many modern big data key-value and traditional RDBMS databases actually do the same for efficiency reasons.

The code might be more complex for missing values, but we know that the Iris dataset is not sparse—otherwise we'd complement our code with a bunch of if statements. We mapped the last two labels to 1 for our purpose now.

SVMWithSGD

Now, let's run the Linear Support Vector Machine (SVM) SVMWithSGD code from MLlib:

$ bin/spark-shell 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
scala> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
scala> import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.MLUtils
scala> val data = MLUtils.loadLibSVMFile(sc, "iris-libsvm.txt")
data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:112
scala> val splits = data.randomSplit(Array(0.6, 0.4), seed = 123L)
splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:26, MapPartitionsRDD[8] at randomSplit at <console>:26)
scala> val training = splits(0).cache()
training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[7] at randomSplit at <console>:26
scala> val test = splits(1)
test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at randomSplit at <console>:26
scala> val numIterations = 100
numIterations: Int = 100
scala> val model = SVMWithSGD.train(training, numIterations)
model: org.apache.spark.mllib.classification.SVMModel = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 4, numClasses = 2, threshold = 0.0
scala> model.clearThreshold()
res0: model.type = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 4, numClasses = 2, threshold = None
scala> val scoreAndLabels = test.map { point =>
     |   val score = model.predict(point.features)
     |   (score, point.label)
     | }
scoreAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[212] at map at <console>:36
scala> val metrics = new BinaryClassificationMetrics(scoreAndLabels)
metrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@692e4a35
scala> val auROC = metrics.areaUnderROC()
auROC: Double = 1.0

scala> println("Area under ROC = " + auROC)
Area under ROC = 1.0
scala> model.save(sc, "model")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

So, you just run one of the most complex algorithms in the machine learning toolbox: SVM. The result is a separating plane that distinguishes Iris setosa flowers from the other two types. The model in this case is exactly the intercept and the coefficients of the plane that best separates the labels:

scala> model.intercept
res5: Double = 0.0

scala> model.weights
res6: org.apache.spark.mllib.linalg.Vector = [-0.2469448809675877,-1.0692729424287566,1.7500423423258127,0.8105712661836376]

If one looks under the hood, the model is stored in a parquet file, which can be dumped using parquet-tool:

$ parquet-tools dump model/data/part-r-00000-7a86b825-569d-4c80-8796-8ee6972fd3b1.gz.parquet

DOUBLE weights.values.array 
----------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 4 *** 
value 1: R:0 D:3 V:-0.2469448809675877
value 2: R:1 D:3 V:-1.0692729424287566
value 3: R:1 D:3 V:1.7500423423258127
value 4: R:1 D:3 V:0.8105712661836376

DOUBLE intercept 
----------------------------------------------------------------------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 1 *** 
value 1: R:0 D:1 V:0.0

The Receiver Operating Characteristic (ROC) is a common measure of the classifier to be able to correctly rank the records according to their numeric label. We will consider precision metrics in more detail in Chapter 9, NLP in Scala.

Tip

What is ROC?

ROC has emerged in signal processing with the first application to measure the accuracy of analog radars. The common measure of accuracy is area under ROC, which, shortly, is the probability of two randomly chosen points to be ranked correctly according to their labels (the 0 label should always have a lower rank than the 1 label). AUROC has a number of attractive characteristics:

  • The value, at least theoretically, does not depend on the oversampling rate, that is, the rate at which we see 0 labels as opposed to 1 labels.
  • The value does not depend on the sample size, excluding the expected variance due to the limited sample size.
  • Adding a constant to the final score does not change the ROC, thus the intercept can always be set to 0. Computing the ROC requires a sort with respect to the generated score.

Of course, separating the remaining two labels is a harder problem since the plane that separated Iris versicolor from Iris virginica does not exist: the AUROC score will be less than 1.0. However, the SVM method will find the plane that best differentiates between the latter two classes.

Logistic regression

Logistic regression is one of the oldest classification methods. The outcome of the logistic regression is also a set of weights, which define the hyperplane, but the loss function is logistic loss instead of L2:

Logistic regression

Logit function is a frequent choice when the label is binary (as y = +/- 1 in the above equation):

$ bin/spark-shell 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
scala> import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.evaluation.MulticlassMetrics
scala> import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
scala> import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.MLUtils
scala> val data = MLUtils.loadLibSVMFile(sc, "iris-libsvm-3.txt")
data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:112
scala> val splits = data.randomSplit(Array(0.6, 0.4))
splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:29, MapPartitionsRDD[8] at randomSplit at <console>:29)
scala> val training = splits(0).cache()
training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[7] at randomSplit at <console>:29
scala> val test = splits(1)
test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at randomSplit at <console>:29
scala> val model = new LogisticRegressionWithLBFGS().setNumClasses(3).run(training)
model: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 8, numClasses = 3, threshold = 0.5
scala> val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
     |   val prediction = model.predict(features)
     |   (prediction, label)
     | }
predictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[67] at map at <console>:37
scala> val metrics = new MulticlassMetrics(predictionAndLabels)
metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@6d5254f3
scala> val precision = metrics.precision
precision: Double = 0.9516129032258065
scala> println("Precision = " + precision)
Precision = 0.9516129032258065
scala> model.intercept
res5: Double = 0.0
scala> model.weights
res7: org.apache.spark.mllib.linalg.Vector = [10.644978886788556,-26.850171485157578,3.852594349297618,8.74629386938248,4.288703063075211,-31.029289381858273,9.790312529377474,22.058196856491996]

The labels in this case can be any integer in the range [0, k), where k is the total number of classes (the correct class will be determined by building multiple binary logistic regression models against the pivot class, which in this case, is the class with the 0 label) (The Elements of Statistical Learning by Trevor Hastie, Robert Tibshirani, Jerome Friedman, Springer Series in Statistics).

The accuracy metric is precision, or the percentage of records predicted correctly (which is 95% in our case).

Decision tree

The preceding two methods describe linear models. Unfortunately, the linear approach does not always work for complex interactions between attributes. Assume that the label looks like an exclusive OR: 0 if X ≠ Y and 1 if X = Y:

X

Y

Label

1

0

0

0

1

0

1

1

1

0

0

1

There is no hyperplane that can differentiate between the two labels in the XY space. Recursive split solution, where the split on each level is made on only one variable or a linear combination thereof might work a bit better in these case. Decision trees are also known to work well with sparse and interaction-rich datasets:

$ bin/spark-shell 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.DecisionTree
scala> import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.tree.model.DecisionTreeModel
scala> import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.MLUtils
scala> import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.configuration.Strategy
scala> import org.apache.spark.mllib.tree.configuration.Algo.Classification
import org.apache.spark.mllib.tree.configuration.Algo.Classification
scala> import org.apache.spark.mllib.tree.impurity.{Entropy, Gini}
import org.apache.spark.mllib.tree.impurity.{Entropy, Gini}
scala> val data = MLUtils.loadLibSVMFile(sc, "iris-libsvm-3.txt")
data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:112

scala> val splits = data.randomSplit(Array(0.7, 0.3), 11L)
splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:30, MapPartitionsRDD[8] at randomSplit at <console>:30)
scala> val (trainingData, testData) = (splits(0), splits(1))
trainingData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[7] at randomSplit at <console>:30
testData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[8] at randomSplit at <console>:30
scala> val strategy = new Strategy(Classification, Gini, 10, 3, 10)
strategy: org.apache.spark.mllib.tree.configuration.Strategy = org.apache.spark.mllib.tree.configuration.Strategy@4110e631
scala> val dt = new DecisionTree(strategy)
dt: org.apache.spark.mllib.tree.DecisionTree = org.apache.spark.mllib.tree.DecisionTree@33d89052
scala> val model = dt.run(trainingData)
model: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 6 with 21 nodes
scala> val labelAndPreds = testData.map { point =>
     |   val prediction = model.predict(point.features)
     |   (point.label, prediction)
     | }
labelAndPreds: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[32] at map at <console>:36
scala> val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
testErr: Double = 0.02631578947368421
scala> println("Test Error = " + testErr)
Test Error = 0.02631578947368421

scala> println("Learned classification tree model:
" + model.toDebugString)
Learned classification tree model:
DecisionTreeModel classifier of depth 6 with 21 nodes
  If (feature 3 <= 0.4)
   Predict: 0.0
  Else (feature 3 > 0.4)
   If (feature 3 <= 1.7)
    If (feature 2 <= 4.9)
     If (feature 0 <= 5.3)
      If (feature 1 <= 2.8)
       If (feature 2 <= 3.9)
        Predict: 1.0
       Else (feature 2 > 3.9)
        Predict: 2.0
      Else (feature 1 > 2.8)
       Predict: 0.0
     Else (feature 0 > 5.3)
      Predict: 1.0
    Else (feature 2 > 4.9)
     If (feature 0 <= 6.0)
      If (feature 1 <= 2.4)
       Predict: 2.0
      Else (feature 1 > 2.4)
       Predict: 1.0
     Else (feature 0 > 6.0)
      Predict: 2.0
   Else (feature 3 > 1.7)
    If (feature 2 <= 4.9)
     If (feature 1 <= 3.0)
      Predict: 2.0
     Else (feature 1 > 3.0)
      Predict: 1.0
    Else (feature 2 > 4.9)
     Predict: 2.0
scala> model.save(sc, "dt-model")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

As you can see, the error (misprediction) rate on hold-out 30% sample is only 2.6%. The 30% sample of 150 is only 45 records, which means we missed only 1 record from the whole test set. Certainly, the result might and will change with a different seed, and we need a more rigorous cross-validation technique to prove the accuracy of the model, but this is enough for a rough estimate of model performance.

Decision tree generalizes on regression case, that is, when the label is continuous in nature. In this case, the splitting criterion is minimization of weighted variance, as opposed to entropy gain or gini in the case of classification. I will talk more about the differences in Chapter 5, Regression and Classification.

There are a number of parameters, which can be tuned to improve the performance:

Parameter

Description

Recommended value

maxDepth

This is the maximum depth of the tree. Deep trees are costly and usually are more likely to overfit. Shallow trees are more efficient and better for bagging/boosting algorithms such as AdaBoost.

This depends on the size of the original dataset. It is worth experimenting and plotting the accuracy of the resulting tree versus the parameter to find out the optimum.

minInstancesPerNode

This also limits the size of the tree: once the number of instances falls under this threshold, no further splitting occurs.

The value is usually 10-100, depending on the complexity of the original dataset and the number of potential labels.

maxBins

This is used only for continuous attributes: the number of bins to split the original range.

Large number of bins increase computation and communication cost. One can also consider the option of pre-discretizing the attribute based on domain knowledge.

minInfoGain

This is the amount of information gain (entropy), impurity (gini), or variance (regression) gain to split a node.

The default is 0, but you can increase the default to limit the tree size and reduce the risk of overfitting.

maxMemoryInMB

This is the amount of memory to be used for collecting sufficient statistics.

The default value is conservatively chosen to be 256 MB to allow the decision algorithm to work in most scenarios. Increasing maxMemoryInMB can lead to faster training (if the memory is available) by allowing fewer passes over the data. However, there may be decreasing returns as maxMemoryInMB grows, as the amount of communication on each iteration can be proportional to maxMemoryInMB.

subsamplingRate

This is the fraction of the training data used for learning the decision tree.

This parameter is most relevant for training ensembles of trees (using RandomForest and GradientBoostedTrees), where it can be useful to subsample the original data. For training a single decision tree, this parameter is less useful since the number of training instances is generally not the main constraint.

useNodeIdCache

If this is set to true, the algorithm will avoid passing the current model (tree or trees) to executors on each iteration.

This can be useful with deep trees (speeding up computation on workers) and for large random forests (reducing communication on each iteration).

checkpointDir:

This is the directory for checkpointing the node ID cache RDDs.

This is an optimization to save intermediate results to avoid recomputation in case of node failure. Set it in large clusters or with unreliable nodes.

checkpointInterval

This is the frequency for checkpointing the node ID cache RDDs.

Setting this too low will cause extra overhead from writing to HDFS and setting this too high can cause problems if executors fail and the RDD needs to be recomputed.

Bagging and boosting – ensemble learning methods

As a portfolio of stocks has better characteristics compared to individual equities, models can be combined to produce better classifiers. Usually, these methods work really well with decision trees as the training technique can be modified to produce models with large variations. One way is to train the model on random subsets of the original data or random subsets of attributes, which is called random forest. Another way is to generate a sequence of models, where misclassified instances are reweighted to get a larger weight in each subsequent iteration. It has been shown that this method has a relation to gradient descent methods in the model parameter space. While these are valid and interesting techniques, they usually require much more space in terms of model storage and are less interpretable compared to bare decision tree models. For Spark, the ensemble models are currently under development—the umbrella issue is SPARK-3703 (https://issues.apache.org/jira/browse/SPARK-3703).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset