Chapter 7. Training a Machine Learning Model

In Chapter 5, we learned how to prepare and clean up our data, which is the first step in the machine learning pipeline. Now let’s take a deep dive into how to use our data to train a machine learning model.

Training is often considered to be the “bulk” of the work in machine learning. Our goal is to create a function (the “model”) that can accurately predict results that it hasn’t seen before. Intuitively, model training is very much like how humans learn a new skill—we observe, practice, correct our mistakes, and gradually improve. In machine learning, we start with an initial model that might not be very good at its job. We then put the model through a series of “training steps,” where training data is fed to the model. At each training step, we compare the prediction results produced by our model with the “true” results, and see how well our model performed. We then tinker with the parameters to this model (for example by changing how much weight is given to each feature) to attempt to improve the model’s accuracy. A good model is one that makes accurate predictions without overfitting to a specific set of inputs.

In this chapter, we are going to learn how to train machine learning models using two different libraries—TensorFlow and Scikit-learn. TensorFlow has native, first-class support in Kubeflow, while Scikit-learn does not. But as we will see in this chapter, both libraries can be easily integrated with Kubeflow. We’ll demonstrate how you can experiment with models in Kubeflow’s notebooks, and how you can deploy these models to production environments.

Building a Recommender with TensorFlow

Let us first take a look at TensorFlow—an open source framework for machine learning developed by Google. It is currently one of the most popular libraries for machine learning-powered applications, in particular for implementing deep learning. TensorFlow has great support for computational tasks on a variety of hardware, including CPUs, GPUs, and TPUs. We chose TensorFlow for this tutorial because its high-level APIs are user-friendly and abstracts away much of the gory details.

Let’s get acquainted with TensorFlow with a simple tutorial. Earlier in Chapter 1 we’ve introduced our case studies, one of which is a product recommendation system for customers. In this chapter, we will be implementing this system with TensorFlow. Specifically, we will do two things:

  1. Use TensorFlow to train a model for product recommendation.

  2. Use Kubeflow to wrap the training code and deploy it to a production cluster.

TensorFlow’s high-level Keras API makes it relatively easy to implement our model. In fact, the bulk of the model can be implemented with less than 50 lines of Python code.

Tip

Keras is the high-level TensorFlow API for deep learning models. It has a user-friendly interface and has high extensibility. As an added bonus, Keras has many common neural network implementations straight out of the box, so you can get a model up and running right away.

Let’s begin by selecting a model for our recommender. We begin with a simple assumption—that if two people (Alice and Bob) have similar opinions on a set of products, then they are also more likely to think similarly on other products. In other words, Alice is more likely to have the same preferences as Bob than a randomly chosen third person is. Thus, we can build a recommendation model using just the users’ purchase history. This is the idea of collaborative filtering—we collect preferential information from many users (hence “collaborative”) and use this data to make selective predictions (hence “filtering”).

To build this recommender model, we will need a few things:

Users’ purchasing history

We will use the example input data https:// github.com/moorissa/medium/blob/master/items-recommender/data[from this GitHub repo].

Data storage

To make sure that our model works across different platforms, we’ll use Minio as the storage system.

Training model

The implementation that we are using is based on a Keras model on GitHub.

We will first experiment with this model using Kubeflow’s notebook servers, and then deploy the training job to our cluster using Kubeflow’s TFJob APIs.

Getting Started

Let’s get started by downloading the prerequisites. 1 To run the notebook, you will need a running Kubeflow cluster that includes a Minio service. Checkout the section for “Support Components” in Chapter 3 to configure Minio. Make sure that MinioClient (“mc”) is also installed.

We also need to prepare the data to facilitate training2 and use Minio client to create the storage objects:

# Port-forward the Minio service to http://localhost:9000
kubectl port-forward -n kubeflow svc/minio-service 9000:9000 &

# Configure Minio host
mc config host add minio http://localhost:9000 minio minio123

# Create storage bucket
mc mb minio/data

# Copy storage objects
mc cp go/src/github.com/medium/items-recommender/data/recommend_1.csv \
        minio/data/recommender/users.csv
mc cp go/src/github.com/medium/items-recommender/data/trx_data.csv \
        minio/data/recommender/transactions.csv

Starting a New Notebook Session

Now let’s start by creating a new notebook. You can do this by navigating to the “Notebook Servers” panel in your Kubeflow dashboard, then clicking on “New Server” and follow the on-screen instructions. For this example, we use the tensorFlow-1.15.2-notebook-cpu:1.0 image. 3

When the notebook server starts up, click on the “Upload” button in the top right corner and upload the Recommender_Kubeflow.ipynb file. Click on the file to start a new session.

The first few sections of the code involves importing libraries and reading the training data from Minio. Then we normalize the input data so that we are ready to start training. This process is called feature preparation, which we discussed in Chapter 5. In this chapter we’ll focus on the model training part of the exercise.

TensorFlow Training

Now that our notebook is set up and the data is prepared, we can create a TensorFlow session: 4

# create TF session and set it in Keras
sess = tf.Session()
K.set_session(sess)
K.set_learning_phase(1)

For the model class, we use the following code for collaborative filtering:

class DeepCollaborativeFiltering(Model):
    def __init__(self, n_customers, n_products, n_factors, p_dropout = 0.2):
        x1 = Input(shape = (1,), name="user")

        P = Embedding(n_customers, n_factors, input_length = 1)(x1)
        P = Reshape((n_factors,))(P)

        x2 = Input(shape = (1,), name="product")

        Q = Embedding(n_products, n_factors, input_length = 1)(x2)
        Q = Reshape((n_factors,))(Q)

        x = concatenate([P, Q], axis=1)
        x = Dropout(p_dropout)(x)

        x = Dense(n_factors)(x)
        x = Activation('relu')(x)
        x = Dropout(p_dropout)(x)

        output = Dense(1)(x)

        super(DeepCollaborativeFiltering, self).__init__([x1, x2], output)

    def rate(self, customer_idxs, product_idxs):
        if (type(customer_idxs) == int and type(product_idxs) == int):
            return self.predict([
                np.array(customer_idxs).reshape((1,)),
                np.array(product_idxs).reshape((1,))])

        if (type(customer_idxs) == str and type(product_idxs) == str):
            return self.predict([
                np.array(customerMapping[customer_idxs]).reshape((1,)),
                np.array(productMapping[product_idxs]).reshape((1,))])

        return self.predict([
            np.array([customerMapping[customer_idx] for customer_idx in customer_idxs]),
            np.array([productMapping[product_idx] for product_idx in product_idxs])
        ])

This is the basis of our model class. It includes a constructor with some code to instantiate the collaborative filtering model using Keras APIs, and a “rate” function which we can use to make a prediction using our model—namely, what rating would a customer give to a particular product.

We can create an instance of the model like so:

model = DeepCollaborativeFiltering(n_customers, n_products, n_factors)
model.summary()

Now we are ready to start training our model. We can do this by setting a few hyperparameters:

bs = 64
val_per = 0.25
epochs = 3

The above are hyperparameters that control the training process. These are typically set before training begins, unlike model parameters which are learned from the training process. Setting the right values for hyperparameters can have significant effects on the effectiveness of your model. For now, let’s just set some default values for them. In a later chapter we’ll see how to use Kubeflow to tune hyperparameters.

We are now ready to run the training code:

model.compile(optimizer = 'adam', loss = mean_squared_logarithmic_error)
model.fit(x = [customer_idxs, product_idxs], y = ratings,
        batch_size = bs, epochs = epochs, validation_split = val_per)
print('Done training!')

Once the training is complete, you should see some results, like this:

Train on 100188 samples, validate on 33397 samples
Epoch 1/3
100188/100188 [==============================]
- 21s 212us/step - loss: 0.0105 - val_loss: 0.0186
Epoch 2/3
100188/100188 [==============================]
- 20s 203us/step - loss: 0.0092 - val_loss: 0.0188
Epoch 3/3
100188/100188 [==============================]
- 21s 212us/step - loss: 0.0078 - val_loss: 0.0192
Done training!

Congratulations: you’ve successfully trained a TensorFlow model in a Jupyter notebook. But we’re not quite done yet—in order to make use of our model later, we should first export it. You can do this by first setting up the export destination using the Minio client:

directorystream = minioClient.get_object('data', 'recommender/directory.txt')
directory = ""
for d in directorystream.stream(32*1024):
    directory += d.decode('utf-8')
arg_version = "1"
export_path = 's3://models/' + directory + '/' + arg_version + '/'
print ('Exporting trained model to', export_path)

Once you have set up your export destination, you can then export the model:

# inputs/outputs
tensor_info_users = tf.saved_model.utils.build_tensor_info(model.input[0])
tensor_info_products = tf.saved_model.utils.build_tensor_info(model.input[1])
tensor_info_pred = tf.saved_model.utils.build_tensor_info(model.output)

print ("tensor_info_users", tensor_info_users.name)
print ("tensor_info_products", tensor_info_products.name)
print ("tensor_info_pred", tensor_info_pred.name)

# signature
prediction_signature = (tf.saved_model.signature_def_utils.build_signature_def(
        inputs={"users": tensor_info_users, "products": tensor_info_products},
        outputs={"predictions": tensor_info_pred},
        method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))
# export
legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op')
builder = tf.saved_model.builder.SavedModelBuilder(export_path)
builder.add_meta_graph_and_variables(
      sess, [tf.saved_model.tag_constants.SERVING],
      signature_def_map={
        tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
          prediction_signature,
      },
      legacy_init_op=legacy_init_op)
builder.save()

Now we’re ready to use this model to serve predictions, as we’ll learn in a later chapter. But before that, let’s first take a look at how to deploy this training job using Kubeflow.

Deploying a TensorFlow Training Job

So far we have done some TensorFlow training using Jupyter notebooks, which is a great way to prototype and experiment. But soon we may discover that our prototype is insufficient—perhaps we need to refine the model using more data, or perhaps we need to train the model using specialized hardware. Sometimes we may even need to continuously run the training job because our model is constantly evolving. Perhaps most importantly, our model has to be deployable to production, where it can serve actual customer requests.

In order to handle these requirements, our training code must be easily packageable and deployable to various different environments. One of the ways to achieve this is to use TFJob—a Kubernetes custom resource (implemented using Kubernetes operator tf-operator) that you can use to run TensorFlow training jobs on Kubernetes.

We’ll start by deploying our recommender as a single-container TFJob. Since we already have a Python notebook, exporting it as a Python file is fairly simple—just select “File,” then “Download as” and select “Python.” This should save your notebook as a ready-to-execute Python file.

The next step is to package the training code in a container. This can be done with the following Dockerfile.

Example 7-1. TF Job Dockerfile
FROM  tensorflow/tensorflow:1.15.2-py3
RUN pip3 install --upgrade pip
RUN pip3 install pandas --upgrade
RUN pip3 install keras --upgrade
RUN pip3 install minio --upgrade
RUN mkdir -p /opt/kubeflow
COPY Recommender_Kubeflow.py /opt/kubeflow/
ENTRYPOINT ["python3", "/opt/kubeflow/Recommender_Kubeflow.py"]

Next we need to build this container along with its required libraries, and push the container image to a repository.

docker build -t kubeflow/recommenderjob:1.0 .
docker push kubeflow/recommenderjob:1.0

Once that’s done, we are ready to create the specification for a TFJob.

Example 7-2. Single-Container TensorFlow Job Example
apiVersion: "kubeflow.org/v1"   1
kind: "TFJob"                   2
metadata:
  name: "recommenderjob"        3
spec:
  tfReplicaSpecs:               4
    Worker:
      replicas: 1
    restartPolicy: Never
    template:
      spec:
        containers:
        - name: tensorflow image: kubeflow/recommenderjob:1.0
1

The apiVersion field specifies which version of the TFJob custom resource you are using. The corresponding version (in this case v1) needs to be installed in your Kubeflow cluster.

2

The kind field identifies the type of the custom resource—in this case a TFJob.

3

The metadata field is common to all Kubernetes objects and is used to uniquely identify the object in the cluster—you can add fields like name, namespace, and labels here.

4

The most important part of the schema is tfReplicaSpecs. This is the actual description of your TensorFlow training cluster and its desired state. For this example, we just have a single worker replica. In the following section, we’ll examine this field further.

There are a few other optional configurations for your TFJob, including:

activeDeadlineSeconds

How long to keep this job active before the system can terminate it. If this is set, the system will kill the job after the deadline expires.

backoffLimit

How many times to keep retrying this job before marking it as failed. For example, setting this to 3 means that if a job fails 3 times, the system will stop retrying.

cleanPodPolicy

Configures whether or not to clean up the Kubernetes pods after the job completes. Setting this policy can be useful to keep pods for debugging purposes. This can be set to All (all pods are cleaned up), Running (only running pods are cleaned up), or None (no pods are cleaned up).

Deploy the TFJob to your cluster:

kubectl apply -f recommenderjob.yaml

You can monitor the status of the TFJob with the following command:

kubectl describe tfjob recommenderjob

This should display something like this:

Status:
  Completion Time:  2019-05-18T00:58:27Z
  Conditions:
    Last Transition Time:  2019-05-18T02:34:24Z
    Last Update Time:      2019-05-18T02:34:24Z
    Message:               TFJob recommenderjob is created.
    Reason:                TFJobCreated
    Status:                True
    Type:                  Created
    Last Transition Time:  2019-05-18T02:38:28Z
    Last Update Time:      2019-05-18T02:38:28Z
    Message:               TFJob recommenderjob is running.
    Reason:                TFJobRunning
    Status:                False
    Type:                  Running
    Last Transition Time:  2019-05-18T02:38:29Z
    Last Update Time:      2019-05-18T02:38:29Z
    Message:               TFJob recommenderjob successfully completed.
    Reason:                TFJobSucceeded
    Status:                True
    Type:                  Succeeded
  Replica Statuses:
    Worker:
      Succeeded:  1

Note that the status field contains a list of job conditions, which represent when the job transitioned into each state. This is useful for debugging—if the job failed, the reason for the job’s failure would appear here.

So far we have trained a fairly simple and straightforward model with a modest number of training samples. In real life, learning more complex models may require significantly more training samples or model parameters. Such models can be too large and computationally complex to be handled by one machine. This is where distributed training comes in.

Distributed Training

By now we’ve deployed a single-worker TensorFlow job with Kubeflow. It is called “single-worker” because everything from hosting the data to executing the actual training steps are done on a single machine. However, as models become more complex, a single machine is often insufficient—we may need to distribute the model or the training samples over several networked machines. TensorFlow supports a distributed training mode, in which training is performed in parallel over several worker nodes.

Distributed training typically comes in two flavors: data parallelism and model parallelism. In data parallelism, the training data is partitioned into chunks, and the same training code runs on each chunk. At the end of each training step, each worker communicates its updates to all other nodes. Model parallelism is the opposite—the same training data is used in all workers, but the model itself is partitioned. At the end of each step, each worker is responsible for synchronizing the shared parts of the model.

The TFJob interface supports multi-worker distributed training. Conceptually, a TFJob is a logical grouping of all resources related to a training job, including pods and services. In Kubeflow, each replicated worker or parameter server is scheduled on its own single-container pod. In order for the replicas to synchronize with each other, each replica needs to expose itself through an endpoint, which is a Kubernetes internal service. Grouping these resources logically under a parent resource (which is the TFJob) allows these resources to be co-scheduled and garbage collected together.

In this section we’ll deploy a simple MNist example with distributed training. The TensorFlow training code is provided for you at this GitHub repo.

Let’s take a look at the yaml file for the distributed TensorFlow job.

apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "mnist"
  namespace: kubeflow
spec:
  cleanPodPolicy: None
  tfReplicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: Never
      template:
        spec:
          containers:
            - name: tensorflow
              image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
              command:
                - "python"
                - "/var/tf_mnist/mnist_with_summaries.py"
                - "--log_dir=/train/logs"
                - "--learning_rate=0.01"
                - "--batch_size=150"
              volumeMounts:
                - mountPath: "/train"
                  name: "training"
          volumes:
            - name: "training"
              persistentVolumeClaim:
                claimName: "tfevent-volume"

Note that the tfReplicaSpecs field now contains a few different replica types. In a typical TensorFlow training cluster, there are a few possible possibilities:

Chief

Responsible for orchestrating computational tasks, emitting events, and checkpointing the model.

Parameter Servers (PS)

Provide a distributed data store for the model parameters.

Worker

This is where the computations and training actually happen. When a Chief node is not explicitly defined (as in the above example), one of the workers acts as the Chief node.

Evaluator

The evaluators can be used to compute evaluation metrics as the model is trained.

A replica spec contains a number of properties that describe its desired state:

replicas

How many replicas should be spawned for this replica type.

template

A PodTemplateSpec that describes the pod to create for each replica.

restartPolicy

Determines whether pods will be restarted when they exit. The allowed values are as follows:

Always

Means the pod will always be restarted. This policy is good for parameter servers since they never exit and should always be restarted in the event of failure.

OnFailure

Means the pod will be restarted if the pod exits due to failure. A non-zero exit code indicates a failure. An exit code of 0 indicates success and the pod will not be restarted. This policy is good for chief and workers.

ExitCode

Means the restart behavior is dependent on the exit code of the tensorflow container as follows:

  • 0 indicates the process completed successfully and will not be restarted.

  • 1-127 indicates a permanent error and the container will not be restarted.

  • 128-255 indicates a retryable error and the container will be restarted. This policy is good for the chief and workers.

Never

Means pods that terminate will never be restarted. This policy should rarely be used because Kubernetes will terminate pods for any number of reasons (e.g., node becomes unhealthy) and this will prevent the job from recovering.

Once you have the TFJob spec written, deploy it to your Kubeflow cluster:

kubectl apply -f dist-mnist.yaml

Monitoring the job status is similar to a single-container job:

kubectl describe tfjob mnist

This should output something like the following:

Status:
  Completion Time:  2019-05-12T00:58:27Z
  Conditions:
    Last Transition Time:  2019-05-12T00:57:31Z
    Last Update Time:      2019-05-12T00:57:31Z
    Message:               TFJob dist-mnist-example is created.
    Reason:                TFJobCreated
    Status:                True
    Type:                  Created
    Last Transition Time:  2019-05-12T00:58:21Z
    Last Update Time:      2019-05-12T00:58:21Z
    Message:               TFJob dist-mnist-example is running.
    Reason:                TFJobRunning
    Status:                False
    Type:                  Running
    Last Transition Time:  2019-05-12T00:58:27Z
    Last Update Time:      2019-05-12T00:58:27Z
    Message:               TFJob dist-mnist-example successfully completed.
    Reason:                TFJobSucceeded
    Status:                True
    Type:                  Succeeded
  Replica Statuses:
    Worker:
      Succeeded:  2

Notice that the Replica Statuses field shows a breakdown of status by each replica type. The TFJob is successfully completed when all of its workers complete. If any worker has failed, then the TFJob’s status would be failed as well.

Using GPUs

GPU (graphics processing units) are processors that are composed of many smaller and specialized cores. Originally designed to render graphics images, GPUs are increasingly used for massively parallel computational tasks, such as machine learning. Unlike CPUs, GPUs are ideal for distributing large workloads over its many cores and executing them concurrently.

To use GPUs for training, your Kubeflow cluster needs to be pre-configured to enable GPUs. Refer to the appendix section to see how to do this for your preferred cloud provider. After enabling GPUs on the cluster, you can enable GPUs on the specific replica type in the training spec by modifying the command-line arguments, for example:

    Worker:
      replicas: 4
      restartPolicy: Never
      template:
        spec:
          containers:
            - name: tensorflow
              image: kubeflow/tf-dist-mnist-test:1.0
              args:
            - python
            - /var/tf_dist_mnist/dist_mnist.py
            - --num_gpus=1

Using Other Frameworks for Distributed Training

Kubeflow is designed to be a multi-framework machine learning platform. That means the schema for distributed training can easily be extended to other frameworks. As of the time of this writing, there are a number of operators written to provide first-class support for other frameworks, including PyTorch and Caffe2.

As an example, here is what a PyTorch training job spec looks like ???.

apiVersion: "kubeflow.org/v1"
kind: "PyTorchJob"
metadata:
  name: "pytorch-dist"
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: gcr.io/kubeflow-ci/pytorch-dist-sendrecv-test:1.0
    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: gcr.io/kubeflow-ci/pytorch-dist-sendrecv-test:1.0

As you can see, the format is very similar to that of TFJobs. The only difference is in the replica types.

Training a Model Using Scikit-Learn

Thus far we have seen how to use the built-in operators in Kubeflow to train machine learning models. However, there are many frameworks and libraries for which there are no Kubeflow operators. In these cases you can still use your favorite frameworks in Jupyter notebooks 5 or in custom docker images.

Scikit-learn is an open source Python library for machine learning built on top of Numpy for high-performance linear algebra and array operations. The project started as scikits.learn, a Google Summer of Code project by David Cournapeau. Its name stems from the notion that it is a “SciKit” (SciPy Toolkit), a separately-developed and distributed third-party extension to SciPy. Scikit-learn is one of the most popular machine learning libraries on GitHub, and is one of the most well-maintained. Training models with scikit-learn is supported in Kubeflow as generic Python code, with no specific operators for distributed training.

The library supports state-of-the-art algorithms such as KNN, XGBoost, random forest, SVM among others. Scikit-learn is widely used in kaggle competition as well as prominent tech companies. Scikit-learn helps in preprocessing, dimensionality reduction(parameter selection), classification, regression, clustering, and model selection.

In this section, we will explore how to train models in Kubeflow by using scikit-learn on the 1994 US Census dataset. This example is based on this implementation of Anchor explanations for income prediction, and is leveraging an extract from the 1994 Census dataset. This dataset includes several categorical variables and continuous features, including age, education, marital status, occupation, salary, relationship, race, sex, native country, capital gains and losses, etc. We will use a Random Forest algorithm - an ensemble learning method for classification, regression and other tasks that operate by constructing a multitude of decision trees at training time and outputting the class that is the mode of the classes (classification) or mean prediction (regression) of the individual trees.

You can download the notebook from this book’s GitHub repo.

Starting a New Notebook Session

Now let’s start by creating a new notebook. Similar to TensorFlow training, you can do this by navigating to the “Notebook Servers” panel in your Kubeflow dashboard, then clicking on “New Server” and following the on-screen instructions. For this example, we can use the tensorFlow-1.15.2-notebook-cpu:1.0 image.

Tip

When working in Kubeflow an easy way to take advantage of GPU resources to accelerate your scikit model is to switch to GPU type.

When the notebook server starts up, click on the “Upload” button in the top right corner and upload the IncomePrediction.ipynb file. Click on the file to start a new session.

Data preparation

The first few sections of the notebook involves importing libraries and reading the data. Then we proceed to feature preparation.6 For feature transformation we are using Scikit-Learn pipelines. The pipeline makes it easier to feed the model with consistent data.

For our Random forest training we need to define ordinal (standardize data) and categorical (one-hot-encoding) features:

ordinal_features = [x for x in range(len(feature_names))
                if x not in list(category_map.keys())]
ordinal_transformer = Pipeline(steps=[
    ('imputer',  SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())])


categorical_features = list(category_map.keys())
categorical_transformer = Pipeline(steps=[('imputer',
    SimpleImputer(strategy='median')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])
Tip

Many real world datasets contain missing values, which are encoded by data-specific placeholders, for example, blanks, NaNs or other. Such datasets are typically incompatible with scikit-learn estimators which assume that all values are numerical. There are multiple strategies to deal with such missing data. A basic strategy is to discard entire rows and/or columns containing missing values which comes at the price of losing data. A better strategy is to impute the missing values - to infer them from the known part of the data. Simple imputer is a scikit-learn class which allows to handle the missing data in the predictive model dataset by replacing the NaN values with a specified predefined values.

Tip

Scikit-learn one hot encoding is used to encode categorical features as a one-hot numeric array. The encoder transforms an array-like of integers or strings, denoting the values taken on by categorical (discrete) features. The features are encoded using a one-hot (aka ‘one-of-K’ or ‘dummy’) encoding scheme. This creates a binary column for each category and returns a sparse matrix or dense array (depending on the sparse parameter)

Once features are defined, we can use a column transformer to combine them together:

preprocessor = ColumnTransformer(transformers=[
    ('num', ordinal_transformer, ordinal_features),
    ('cat', categorical_transformer, categorical_features)])
preprocessor.fit(X_train)

For our example, the transformer itself looks like the following:

ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
  transformer_weights=None,
  transformers=[('num',
    Pipeline(memory=None,
      steps=[
        ('imputer', SimpleImputer(add_indicator=False,
          copy=True,
          fill_value=None,
          missing_values=nan,
          strategy='median',
          verbose=0)),
        ('scaler', StandardScaler(copy=True,
          with_mean=True,
          with_std=True))],
        verbose=False),
      [0, 8, 9, 10]),
    ('cat',
     Pipeline(memory=None,
       steps=[('imputer', SimpleImputer(add_indicator=False,
         copy=True,
         fill_value=None,
         missing_values=nan,
         strategy='median',
         verbose=0)),
       ('onehot', OneHotEncoder(categories='auto',
         drop=None,
         dtype=<class 'numpy.float64'>,
         handle_unknown='ignore',
         sparse=True))],
       verbose=False),
       [1, 2, 3, 4, 5, 6, 7, 11])],
    verbose=False)

As a result of this transformation we have our data in the form of features ready for training.

Scikit-learn Training

Once we have our features prepared we can proceed with the training. Here we will use RandomForestClassifier provided by the Scikit-learn library:

Tip

The set and specific features of machine learning algorithm(s) is one of the main drivers behind picking a specific framework for machine learning implementation. Even the same algorithm implementation in different frameworks provides slightly different features that might (or might not) be important for your specific dataset.

np.random.seed(0)
clf = RandomForestClassifier(n_estimators=50)
clf.fit(preprocessor.transform(X_train), Y_train)

Once prediction is done, we can evaluate training results:

predict_fn = lambda x: clf.predict(preprocessor.transform(x))
print('Train accuracy: ', accuracy_score(Y_train, predict_fn(X_train)))
print('Test accuracy: ', accuracy_score(Y_test, predict_fn(X_test)))

Which returns the following results:

Train accuracy:  0.9655333333333334
Test accuracy:  0.855859375

At this point the model is created and can be directly used by exporting it (see below). One of most important attribute of a model is its explainability. Although model explainability is mostly used in model serving, it is also important for model creation, due to two main reasons:

  • If explainability is important for model serving, during model creation, we often need to validate that the model that was created is explainable.

  • Many of the model explanation methods require additional calculations during model creation.

Based on this, we will show here how to implement model explainability 7 during model creation.

Explaining the Model

For model explanation, we are using Anchors, part of Seldon’s Alibi project.

The algorithm provides model-agnostic (black box) and human interpretable explanations suitable for classification models applied to images, text and tabular data. The continuous features are discretized into quantiles (e.g., deciles), so they become more interpretable. The features in a candidate anchor are kept constant (same category or bin for discretized features) while we sample the other features from a training set.

explainer = AnchorTabular(
    predict_fn, feature_names, categorical_names=category_map, seed=1)
explainer.fit(X_train, disc_perc=[25, 50, 75])

This creates the following tabular Anchor:

AnchorTabular(meta={
    'name': 'AnchorTabular',
    'type': ['blackbox'],
    'explanations': ['local'],
    'params': {'seed': 1, 'disc_perc': [25, 50, 75]}
})

Now we can get an anchor for the prediction of the first observation in the test set. An anchor is a sufficient condition—that is, when the anchor holds, the prediction should be the same as the prediction for this instance:

idx = 0
class_names = adult.target_names
print('Prediction: ', class_names[explainer.predictor(X_test[idx].reshape(1, -1))[0]])

Which returns the following:

Prediction:  <=50K

We set the precision threshold to 0.95. This means that predictions on observations where the anchor holds will be the same as the prediction on the explained instance at least 95% of the time. Now we can get an explanation for this prediction:

explanation = explainer.explain(X_test[idx], threshold=0.95)
print('Anchor: %s' % (' AND '.join(explanation.anchor)))
print('Precision: %.2f' % explanation.precision)
print('Coverage: %.2f' % explanation.coverage)

Which returns:

Anchor: Marital Status = Separated AND Sex = Female
Precision: 0.95
Coverage: 0.18

Telling us that the main factors for decision are marital status (Separated) and sex (Female). Anchors might not be found for all points. Let’s try getting an anchor for a different observation in the test set—one for which the prediction is >50K:

idx = 6
class_names = adult.target_names
print('Prediction: ', class_names[explainer.predictor(X_test[idx].reshape(1, -1))[0]])

explanation = explainer.explain(X_test[idx], threshold=0.95)
print('Anchor: %s' % (' AND '.join(explanation.anchor)))
print('Precision: %.2f' % explanation.precision)
print('Coverage: %.2f' % explanation.coverage)

This will return:

Prediction:  >50K
Could not find an result satisfying the 0.95 precision constraint.
Now returning the best non-eligible result.
Anchor: Capital Loss > 0.00 AND Relationship = Husband AND
    Marital Status = Married AND Age > 37.00 AND
    Race = White AND Country = United-States AND Sex = Male
Precision: 0.71
Coverage: 0.05

Due to the imbalanced dataset (roughly 25:75 high:low earner proportion), during the sampling stage feature ranges corresponding to low-earners will be oversampled. As a result Anchor in this case is not found. This is a feature because it can point out an imbalanced dataset, but it can also be fixed by producing balanced datasets to enable anchors to be found for either class.

Exporting Model

In order to use the created model for serving, we need to export the model. This is done using Scikit-learn functionality:

dump(clf, '/tmp/job/income.joblib')

This exports a model in Scikit-learn format, that can be used by, for example, scikit-learn server for inference.

Integration into Pipelines

Regardless of which Python based machine learning library you want to use, if Kubeflow doesn’t have an operator for it you can simply write your code as normal and then containerize it. To take the notebook we built in this chapter and use it as a pipeline stage see “Using an Entire Notebook as a Dataprep Pipeline Stage”. Here we can use file_output to upload the resulting model to our artifact tracking system but you can also use the persistent volume mechanism.

Putting It Together

In this chapter, we have taken a look at how to train machine learning models in Kubeflow using two very different frameworks—TensorFlow and Scikit-learn.

We learned how to build a collaborative filtering recommendation system using TensorFlow. We used Kubeflow to create a notebook session, where we’ve prototyped a TensorFlow model with Keras APIs, and then used the TFJob APIs to deploy our training job to a Kubernetes cluster. Finally, we’ve looked at how to use TFJob for distributed training.

We also learned how to train a generic Python model using Scikit-learn, a framework that is not natively supported by Kubeflow. Chapter 9 looks at how to integrate nonsupported non-Python machine learning systems which is a bit more complicated. While Kubeflow’s first party training operators can simplify your work, it’s important to remember you aren’t limited by this.

In the next chapter we will look at how to serve the model that we’ve trained in this chapter.

1 You can download the notebook from the book’s GitHub repo.

2 You can download the user purchase history data from this GitHub site.

3 Currently Kubeflow provides CPU and GPU images with TensorFlow 1.15.2 and 2.1.0 or you can use a custom image.

4 The examples in this chapter use TensorFlow 1.15.2. Examples with TensorFlow 2.1.0 can be found on GitHub.

5 The languages currently supported by Jupyter notebooks include Python, R, Julia, and Scala.

6 See Chapter 5 for in depth discussion of feature preparation.

7 Reffer to this blog post for more information on model explainability

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset