Predicting the chances of infant survival with ML

In this section, we will use the portion of the dataset from the previous chapter to present the ideas of PySpark ML.

Note

If you have not yet downloaded the data while reading the previous chapter, it can be accessed here: http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.

In this section, we will, once again, attempt to predict the chances of the survival of an infant.

Loading the data

First, we load the data with the help of the following code:

import pyspark.sql.types as typ
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('births_transformed.csv.gz', 
                        header=True, 
                        schema=schema)

We specify the schema of the DataFrame; our severely limited dataset now only has 17 columns.

Creating transformers

Before we can use the dataset to estimate a model, we need to do some transformations. Since statistical models can only operate on numeric data, we will have to encode the BIRTH_PLACE variable.

Before we do any of this, since we will use a number of different feature transformations later in this chapter, let's import them all:

import pyspark.ml.feature as ft

To encode the BIRTH_PLACE column, we will use the OneHotEncoder method. However, the method cannot accept StringType columns; it can only deal with numeric types so first we will cast the column to an IntegerType:

births = births 
    .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] 
    .cast(typ.IntegerType()))

Having done this, we can now create our first Transformer:

encoder = ft.OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

Let's now create a single column with all the features collated together. We will use the VectorAssembler method:

featuresCreator = ft.VectorAssembler(
    inputCols=[
        col[0] 
        for col 
        in labels[2:]] + 
    [encoder.getOutputCol()], 
    outputCol='features'
)

The inputCols parameter passed to the VectorAssembler object is a list of all the columns to be combined together to form the outputCol—the 'features'. Note that we use the output of the encoder object (by calling the .getOutputCol() method), so we do not have to remember to change this parameter's value should we change the name of the output column in the encoder object at any point.

It's now time to create our first estimator.

Creating an estimator

In this example, we will (once again) use the logistic regression model. However, later in the chapter, we will showcase some more complex models from the .classification set of PySpark ML models, so we load the whole section:

import pyspark.ml.classification as cl

Once loaded, let's create the model by using the following code:

logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

We would not have to specify the labelCol parameter if our target column had the name 'label'. Also, if the output of our featuresCreator was not called 'features', we would have to specify the featuresCol by (most conveniently) calling the getOutputCol() method on the featuresCreator object.

Creating a pipeline

All that is left now is to create a Pipeline and fit the model. First, let's load the Pipeline from the ML package:

from pyspark.ml import Pipeline

Creating a Pipeline is really easy. Here's how our pipeline should look like conceptually:

Creating a pipeline

Converting this structure into a Pipeline is a walk in the park:

pipeline = Pipeline(stages=[
        encoder, 
        featuresCreator, 
        logistic
    ])

That's it! Our pipeline is now created so we can (finally!) estimate the model.

Fitting the model

Before you fit the model, we need to split our dataset into training and testing datasets. Conveniently, the DataFrame API has the .randomSplit(...) method:

births_train, births_test = births 
    .randomSplit([0.7, 0.3], seed=666)

The first parameter is a list of dataset proportions that should end up in, respectively, births_train and births_test subsets. The seed parameter provides a seed to the randomizer.

Note

You can also split the dataset into more than two subsets as long as the elements of the list sum up to 1, and you unpack the output into as many subsets.

For example, we could split the births dataset into three subsets like this:

train, test, val = births.
    randomSplit([0.7, 0.2, 0.1], seed=666)

The preceding code would put a random 70% of the births dataset into the train object, 20% would go to the test, and the val DataFrame would hold the remaining 10%.

Now it is about time to finally run our pipeline and estimate our model:

model = pipeline.fit(births_train)
test_model = model.transform(births_test)

The .fit(...) method of the pipeline object takes our training dataset as an input. Under the hood, the births_train dataset is passed first to the encoder object. The DataFrame that is created at the encoder stage then gets passed to the featuresCreator that creates the 'features' column. Finally, the output from this stage is passed to the logistic object that estimates the final model.

The .fit(...) method returns the PipelineModel object (the model object in the preceding snippet) that can then be used for prediction; we attain this by calling the .transform(...) method and passing the testing dataset created earlier. Here's what the test_model looks like in the following command:

test_model.take(1)

It generates the following output:

Fitting the model

As you can see, we get all the columns from the Transfomers and Estimators. The logistic regression model outputs several columns: the rawPrediction is the value of the linear combination of features and the β coefficients, the probability is the calculated probability for each of the classes, and finally, the prediction is our final class assignment.

Evaluating the performance of the model

Obviously, we would like to now test how well our model did. PySpark exposes a number of evaluation methods for classification and regression in the .evaluation section of the package:

import pyspark.ml.evaluation as ev

We will use the BinaryClassficationEvaluator to test how well our model performed:

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

The rawPredictionCol can either be the rawPrediction column produced by the estimator or the probability.

Let's see how well our model performed:

print(evaluator.evaluate(test_model, 
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, 
   {evaluator.metricName: 'areaUnderPR'}))

The preceding code produces the following result:

Evaluating the performance of the model

The area under the ROC of 74% and area under PR of 71% shows a well-defined model, but nothing out of extraordinary; if we had other features, we could drive this up, but this is not the purpose of this chapter (nor the book, for that matter).

Saving the model

PySpark allows you to save the Pipeline definition for later use. It not only saves the pipeline structure, but also all the definitions of all the Transformers and Estimators:

pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

So, you can load it up later and use it straight away to .fit(...) and predict:

loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline 
    .fit(births_train)
    .transform(births_test)
    .take(1)

The preceding code produces the same result (as expected):

Saving the model

If you, however, want to save the estimated model, you can also do that; instead of saving the Pipeline, you need to save the PipelineModel.

Tip

Note, that not only the PipelineModel can be saved: virtually all the models that are returned by calling the .fit(...) method on an Estimator or Transformer can be saved and loaded back to be reused.

To save your model, see the following the example:

from pyspark.ml import PipelineModel

modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(births_test)

The preceding script uses the .load(...) method, a class method of the PipelineModel class, to reload the estimated model. You can compare the result of test_reloadedModel.take(1) with the output of test_model.take(1) we presented earlier.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset