Stream transformation

The input stream publishes a similar interface as a Spark DataSet; thus, it can be transformed via a regular SQL interface or machine learning transformers. In our case, we will reuse all the trained models and transformation that were saved in the previous sections.

First, we will load empTitleTransformer-it is a regular Spark pipeline transformer that can be loaded with help of the Spark PipelineModel class:

val empTitleTransformer = PipelineModel.load(s"${modelDir}/empTitleTransformer")

The loanStatus and intRate models were saved in the H2O MOJO format. To load them, it is necessary to use the MojoModel class:

val loanStatusModel = MojoModel.load(new File(s"${modelDir}/loanStatusModel.mojo").getAbsolutePath)
val intRateModel = MojoModel.load(new File(s"${modelDir}/intRateModel.mojo").getAbsolutePath)

At this point, we have all the necessary artifacts ready; however, we cannot use H2O MOJO models directly to transform Spark streams. However, we can wrap them into a Spark transformer. We have already defined a transformer called UDFTransfomer in Chapter 4, Predicting Movie Reviews Using NLP and Spark Streaming so we will follow a similar pattern:

class MojoTransformer(override val uid: String,
mojoModel: MojoModel) extends Transformer {

case class BinomialPrediction(p0: Double, p1: Double)
case class RegressionPrediction(value: Double)

implicit def toBinomialPrediction(bmp: AbstractPrediction) =
BinomialPrediction(bmp.asInstanceOf[BinomialModelPrediction].classProbabilities(0),
bmp.asInstanceOf[BinomialModelPrediction].classProbabilities(1))
implicit def toRegressionPrediction(rmp: AbstractPrediction) =
RegressionPrediction(rmp.asInstanceOf[RegressionModelPrediction].value)

val modelUdf = {
val epmw = new EasyPredictModelWrapper(mojoModel)
mojoModel._category match {
case ModelCategory.Binomial =>udf[BinomialPrediction, Row] { r: Row => epmw.predict(rowToRowData(r)) }
case ModelCategory.Regression =>udf[RegressionPrediction, Row] { r: Row => epmw.predict(rowToRowData(r)) }
}
}

val predictStruct = mojoModel._category match {
case ModelCategory.Binomial =>StructField("p0", DoubleType)::StructField("p1", DoubleType)::Nil
case ModelCategory.Regression =>StructField("pred", DoubleType)::Nil
}

val outputCol = s"${uid}Prediction"

override def transform(dataset: Dataset[_]): DataFrame = {
val inputSchema = dataset.schema
val args = inputSchema.fields.map(f => dataset(f.name))
dataset.select(col("*"), modelUdf(struct(args: _*)).as(outputCol))
}

private def rowToRowData(row: Row): RowData = new RowData {
row.schema.fields.foreach(f => {
row.getAs[AnyRef](f.name) match {
case v: Number => put(f.name, v.doubleValue().asInstanceOf[Object])
case v: java.sql.Timestamp => put(f.name, v.getTime.toDouble.asInstanceOf[Object])
case null =>// nop
case v => put(f.name, v)
}
})
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

override def transformSchema(schema: StructType): StructType = {
val outputFields = schema.fields :+ StructField(outputCol, StructType(predictStruct), false)
StructType(outputFields)
}
}

The defined MojoTransformer supports binomial and regression MOJO models. It accepts a Spark dataset and enriches it by new columns: two columns holding true/false probabilities for binomial models and a single column representing the predicted value of the regression model. This is reflected in transform method, which is using the MOJO wrapper modelUdf to transform the input dataset:

dataset.select(col("*"), modelUdf(struct(args: _*)).as(outputCol))

The modelUdf model implements the transformation from the data represented as Spark Row into a format accepted by MOJO, the call of MOJO, and the transformation of the MOJO prediction into a Spark Row format.

The defined MojoTransformer allows us to wrap the loaded MOJO models into the Spark transformer API:

val loanStatusTransformer = new MojoTransformer("loanStatus", loanStatusModel)
val intRateTransformer = new MojoTransformer("intRate", intRateModel)

At this point, we have all the necessary building blocks ready, and we can apply them on the input stream:

val outputDataStream =
intRateTransformer.transform(
loanStatusTransformer.transform(
empTitleTransformer.transform(
Chapter8Library.basicDataCleanup(inputDataStream))
.withColumn("desc_denominating_words", descWordEncoderUdf(col("desc"))))

The code first calls the shared library function basicDataCleanup and then transform the desc column with another shared library function, descWordEncoderUdf: both cases are implemented on top of Spark DataSet SQL interfaces. The remaining steps will apply defined transformers. Again, we can explore the structure of the transformed stream and verify that it contains fields introduced by our transformations:

outputDataStream.schema.printTreeString()

The output is as follows:

 

We can see that there are several new fields in the schema: representation of the empTitle cluster, the vector of denominating words, and model predictions. Probabilities are from the loab status model and the real value from the interest rate model.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset