Real-world data is usually noisy and inconsistent with missing observations. No classification, regression, or clustering model can extract relevant information from raw data.
Data preprocessing consists of cleaning, filtering, transforming, and normalizing raw observations using statistics in order to correlate features or groups of features, identify trends and models, and filter out noise. The purpose of cleansing raw data is as follows:
You should not underestimate the power of traditional statistical analysis methods to infer and classify information from textual or unstructured data.
In this chapter, you will learn how to:
The overwhelming majority of examples used to illustrate the different machine algorithms in this book deal with time series or sequential, time-ordered set of observations.
The Primitives types section under Source code in Chapter 1, Getting Started, introduced the types for a time series of a single XSeries[T]
variable and multiple XVSeries[T]
variables.
A time series of observations is a vector (a Vector
type) of observation elements of the following types:
T
type in the case of a single variable/feature observationArray[T]
type for observations with more than one variable/featureA time series of labels or expected values is a single variable vector for which elements may have a primitive Int
type for classification and Double
for regression.
A time series of labeled observations is a pair of a vector of observations and a vector of labels:
The two generic XSeries
and XVSeries
types for the time series will be used as the two primary classes for the input data, from now on.
The Stats
class introduced in the Profiling data section in Chapter 2, Hello World!, implements some basic statistics and normalization for single variable observations. Let's create an XTSeries
singleton to compute the statistics and normalize multidimensional observations:
object XTSeries { def zipWithShift[T](xv: XSeries[T], n: Int): Vector[(T, T)] = xv.drop(n).zip(xv.view.dropRight(n)) //1 def zipWithShift1[T](xv: XSeries[T]): Vector[(T, T)] = xv.zip(xv.view.drop(n)) def statistics[T <: AnyVal](xt:XVSeries[T]) (implicit f: T =>: Double): Vector[Stats[T]] = xt.transpose.map( Stats[T]( _ )) //2 def normalize[T <: AnyVal]( //3 xt: XSeries[T], low: Double, high: Double) (implicit ordering: Ordering[T], f: T => Double): Try[DblVector] = Try (Stats[T](xt).normalize(low, high) ) ... }
The first method of the XTSeries
singleton generates a vector of a pair of elements by zipping the last size – n elements of a time series with its first size – n elements (line 1
). The statistics
(line 2
) and normalize
(line 3
) methods operate on both the single and multivariable observations. These three methods are subsets of functionalities implemented in XTSeries
.
Create a time series of the XVSeries[T]
type by zipping the two x
and y
vectors and converting the pair into an array:
def zipToSeries[T: ClassTag](
x: Vector[T], y: Vector[T]): XVSeries[T]
Split a single or multidimensional time series, xv
, into a two-time series at index, n:
def splitAt[T](xv: XSeries[T], n: Int): (XSeries[T], XSeries[T])
Apply a zScore
transform to a single dimension time series:
def zScore[T <: AnyVal](xt: XSeries[T])
(implicit f: T => Double): Try[DblVector]
Apply a zScore
transform to a multidimension time series:
def zScores[T <: AnyVal](xt: XVSeries[T])
(implicit f: T => Double): Try[XVSeries[Double]]
Transform a single dimension time series x
into a new time series whose elements are x(n) – x(n-1):
def delta(x: DblVector): DblVector
Transform a single dimension time series x
into a new time series which elements if (x(n) – x(n-1) > 0.0) 1 else 0:
def binaryDelta(x: DblVector): Vector[Int]
Compute the sum of the squared error between the two x
and z
arrays:
def sse[T <: AnyVal](x: Array[T], z: Array[T]) (implicit f: T => Double): Double
Compute the mean squared error between the two x
and z
arrays:
def mse[T <: AnyVal](x: Array[T], z: Array[T]) (implicit f: T => Double): Double
Compute the mean squared error between the two x
and z
vectors:
def mse(x: DblVector, z: DblVector): Double
Compute the statistics for each feature of a multidimensional time series:
def statistics[T <: AnyVal](xt: XVSeries[T]) (implicit f: T => Double): Vector[Stats[T]]
Apply a f
function to a zipped pair of multidimensional vectors of the XVSeries
type:
def zipToVector[T](x: XVSeries[T], y: XVSeries[T]) (f: (Array[T], Array[T]) =>Double): XSeries[Double] = x.zip(y.view).map{ case (x, y) => f(x,y)}
Some operations on the time series that are implemented as the XTSeries
methods may have a large variety of input and output types. Scala and Java support method overloading that has the following limitations:
Let's consider the transpose operator for any kind of multidimensional time series. The transpose operator can be objectified as the Transpose
trait:
sealed trait Transpose { type Result //4 def apply(): Result //5 }
The trait has an abstract Result
type (line 4
) and an abstract apply()
constructor (line 5
) that allows us to create a generic transpose
method with any kind of combination of input and output types. The conversion type for the input and output types of the transpose
method is defined as implicit
:
implicit def xvSeries2Matrix[T: ClassTag](from: XVSeries[T]) = new Transpose { type Result = Array[Array[T]] //6 def apply(): Result = from.toArray.transpose } implicit def list2Matrix[T: ClassTag](from: List[Array[T]]) = new Transpose { type Result = Array[Array[T]] //7 def apply(): Result = from.toArray.transpose } …
The first xvSeries2Matrix
implicit transposes a time series of the XVSeries[T]
type into a matrix with elements of the T
type (line 6
). The list2Matrix
implicit transposes a time series of the List[Array[T]]
type into a matrix with elements of the T
type (line 7
).
The generic transpose
method is written as follows:
def transpose(tpose: Transpose): tpose.Result = tpose()
The second candidate for the magnet pattern is the computation of the differential in a time series. The purpose is to generate the time series {xt+1 – xt} from a time series {xt}:
sealed trait Difference[T] {
type Result
def apply(f: (Double, Double) => T): Result
}
The Difference
trait allows us to compute the differential of a time series with arbitrary element types. For instance, the differential of a one-dimensional vector of the Double
type is defined by the following implicit conversion:
implicit def vector2Double[T](x: DblVector) = new Difference[T] { type Result = Vector[T] def apply(f: (Double, Double) => T): Result = //8 zipWithShift(x, 1).collect{case(next,prev) =>f(prev,next)} }
The apply()
constructor takes one argument: the user-defined f
function that computes the difference between two consecutive elements of the time series (line 8
). The generic difference method is as follows:
def difference[T](
diff: Difference[T],
f: (Double, Double) => T): diff.Result = diff(f)
Here are some of the predefined differential operators of a time series for which the output of the operator has the Double
(line 9
), Int
(line 10
), and Boolean
(line 11
) types:
val diffDouble = (x: Double,y: Double) => y –x //9 val diffInt = (x: Double,y: Double) => if(y > x) 1 else 0 //10 val diffBoolean = (x: Double,y: Double) => (y > x) //11
The differential operator is used to implement the labeledData
method to generate labeled data from observations with two features and a target (labels) dataset:
def differentialData[T](
x: DblVector,
y: DblVector,
target: DblVector,
f: (Double,Double) =>T): Try[(XVSeries[Double],Vector[T])] =
Try((zipToSeries(x,y), difference(target, f)))
The structure of the labeled data is the pair of observations and the differential of target values.
A view in Scala is a proxy collection that represents a collection but implements the data transformation or higher-order method lazily. The elements of a view are defined as lazy values, which are instantiated on demand.
One important advantage of views over a strict (or fully allocated) collection is the reduced memory consumption.
Let's take a look at the aggregator
data transformation introduced in the Instantiating the workflow section under A workflow computational model in Chapter 2, Hello World!. There is no need to allocate the entire set of x.size
of elements: the higher-order find
method may exit after only a few elements have been read (line 12
):
val aggregator = new ETransform[Int](splits) { override def |> : PartialFunction[U, Try[V]] = { case x: U if(!x.isEmpty) => Try( Range(0, x.size).view.find(x(_) == 1.0).get) //12 } }
Views, iterators, and streams
Views, iterators, and streams share the same objective of constructing elements on demand. There are, however, some major differences: