Chapter 5. Data and Feature Preparation

Machine Learning algorithms are only as good as their training data. Getting good data for training involves data and feature preparation.

Data preparation is the process of sourcing the data and making sure it’s valid. This is a multistep process1 that can include data collection, augmentation, statistics calculation, schema validation, outlier pruning, and various validation techniques. Not having enough data can lead to overfitting, missing significant correlations, and more. Putting in the effort to collect more records and information about each sample during data preparation can considerably improve the model.2

Feature preparation (sometimes called feature engineering) refers to transforming the raw input data into features that the machine learning model can use.3 Poor feature preparation can lead to losing out on important relations, such as a linear model with non-linear terms not expanded, or a deep learning model with inconsistent image orientation.

Small changes in data and feature preparation can lead to significantly different model outputs. The iterative approach is the best for both feature and data preparation, revisiting them as your understanding of the problem and model changes. Kubeflow Pipelines make it easier for us to iterate our data and feature preparation.We will explore how to use hyperparameter tuning to iterate in Chapter 10 .

In this chapter, we will cover different approaches to data and feature preparation and demonstrate how to make them repeatable by using pipelines. We assume you are already familiar with local tools. As such, we’ll start by covering how to structure our local code for pipelines, and then move on to more scalable distributed tools. Once we’ve explored the tools, we’ll put them together in a pipeline, using the examples from “Introducing Our Case Studies”.

Deciding on the Correct Tooling

There are a wide variety of data and feature preparation tools.4 We can categorize them into distributed and local only. Local tools run on a single machine and offer a great amount of flexibility. Distributed tools run on many machines so they can handle larger and more complex tasks. With two very distinct paths of tooling, making the wrong decision here can require substantial changes in code later.

If the input data size is relatively small, a single machine offers you all of the tools you are used to. Larger data sizes tend to require distributed tools for the entire pipeline or just as a sampling stage. Even with smaller data sets, distributed systems, like Apache Spark, Dask, or TFX with Beam, can be faster but may require learning new tools.5

Using the same tool for all of the data and feature preparation activities is not necessary. Using multiple tools is especially common when working with different data sets where using the same tools would be inconvenient. Kubeflow Pipelines allows you to split the implementation into multiple steps and connect them (even if they use different languages) into a cohesive system.

Local Data and Feature Preparation

Working locally limits the scale of data but offers the most comprehensive range of tools. A common way to implement data and feature preparation is with Jupyter notebooks. In the Chapter 4, we covered how to turn parts of the notebook into a pipeline, and here we’ll look at how to structure our data and feature prep code to make this easy.

Using notebooks for data preparation can be a great way to start exploring the data. Notebooks can be especially useful at this stage since we often have the least amount of understanding, and because using visualizations to understand our data can be quite beneficial.

Fetching the Data

For our mailing list example, we use data from public archives on the internet. Ideally, you want to connect to a database, stream, or other data repository. However, even in production, fetching web data can be necessary. First, we’ll implement our data fetching algorithm, which takes an Apache Software Foundation (ASF) project’s email list location along with the year from which to fetch messages. Example 5-1 returns the path to the records it fetches so we can use that as the input to the next pipeline stage.

Note

The function downloads at MOST one year of data, and it sleeps between calls. This is to prevent overwhelming the ASF mail archive servers. The ASF is a charity, please be mindful of that when downloading data and do not abuse their service.

Example 5-1.
def download_data(year: int) -> str:

  # The imports are inline here so Kubeflow can serialize the function correctly.
  from datetime import datetime
  from lxml import etree
  from requests import get
  from time import sleep

 import json

  def scrapeMailArchives(mailingList: str, year: int, month: int):
      #Ugly xpath code goes here. See the example repo if you're curious.

   datesToScrape =  [(year, i) for i in range(1,2)]

   records = []
   for y,m in datesToScrape:
     print(m,"-",y)
     records += scrapeMailArchives("spark-dev", y, m)
   output_path = '/data_processing/data.json'
   with open(output_path, 'w') as f:
     json.dump(records, f)

   return output_path

The above code downloads all of the mailing list data for a given year and saves it to a known path. In this example, a persistent volume needs to be mounted there to allow this data to move between stages, when we make our pipeline.

You may have a data dump as part of the machine learning pipeline, or a different system or team may provide one. For data on GCS or a PV, you can use the built-in components google-cloud/storage/download or filesystem/get_subdirectory to load the data instead of writing a custom function.

Data Cleaning: Filtering Out the Junk

Now that we’ve loaded our data, it’s time to do some simple data cleaning. Local tools are more common, so we’ll focus on them first. While data cleaning often depends on domain expertise, there are standard tools to assist with common tasks. A first step can be validating input records by checking the schema. That is to say; we check to see if the fields are present and are the right type.

To check the schema in the mailing list example, we ensure a sender, subject, and body all exist. To convert this into an independent component, we’ll make our function take a parameter for the input path and return the file path to the cleaned records. The amount of code it takes to do this is relatively small, shown in Example 5-2.

Example 5-2.
def clean_data(input_path: str) -> str:
    import json
    import pandas as pd

    print("loading records...")
    with open(input_path, 'r') as f:
        records = json.load(f)
    print("records loaded")

    df = pd.DataFrame(records)
    # Drop records without a subject, body, or sender
    cleaned = df.dropna(subset=["subject", "body", "from"])

    output_path_hdf = '/data_processing/clean_data.hdf'
    cleaned.to_hdf(output_path_hdf, key="clean")

    return output_path_hdf

There are many other standard data quality techniques besides dropping missing fields. Two of the more popular ones are imputing missing data 6 and analyzing/removing outliers which may be the result of incorrect measurements. Regardless of which additional general techniques you decide to perform, you can simply add them to your data cleaning function.

Domain specific data cleaning tools can also be beneficial. In the mailing list example, one potential source of noise in our data could be spam messages. One way to solve this would be by using Spamassassin. We can add this package to our container as in Example 5-3. Adding system software, not managed by pip, on top of the notebook images is a bit more complicated because of permissions. Most containers run as root, making it simple to install new system packages. However, because of Jupyter, the notebook containers run as a less privileged user. Installing new packages like this requires switching to the root user and back, which is not common in other Dockerfiles.

Example 5-3.
ARG base
FROM $base
# Run as root for updates
USER root
# Install Spamassassin
RUN apt-get update && 
    apt-get install -yq spamassassin spamc && 
    apt-get clean && 
    rm -rf /var/lib/apt/lists/* && 
    rm -rf /var/cache/apt
# Switch back to the expected user
USER jovyan

Once you’ve created this Dockerfile, you’ll want to build it and push the resulting image somewhere that the Kubeflow cluster can access as in Example 2-8.

Pushing a new container is not enough to let Kubeflow know that we want to use it. When constructing a pipeline stage with func_to_container_op you then need to specify the base_image parameter to the func_to_container_op function call. We’ll show this when we bring the example together as a pipeline in Example 5-35.

Here we see the power of containers again. You can add the tools we need on top of the building blocks provided by Kubeflow rather than making everything from scratch.

Once the data is cleaned, it’s time to make sure you have enough of it, or if not explore augmenting your data.

Formatting the Data

The correct format depends on which tool you’re using to do the feature preparation. If you’re sticking with the same tool you used for data preparation, an output can be the same as input. Otherwise, you might find this a good place to change formats. For example when using Spark for data prep and TensorFlow for training we often implement conversion to TFRecords here.

Feature Preparation

How to do feature preparation depends on the problem. With the mailing list example we can write all kinds of text-processing functions and combine them into features as shown in Example 5-4.

Example 5-4.
    df['domains'] = df['links'].apply(extract_domains)
    df['isThreadStart'] = df['depth'] == '0'

    # Arguably, you could split building the dataset away from the actual witchcraft.
    from sklearn.feature_extraction.text import TfidfVectorizer

    bodyV = TfidfVectorizer()
    bodyFeatures = bodyV.fit_transform(df['body'])

    domainV = TfidfVectorizer()

    ## A couple of "None" domains really screwed the pooch on this one.Also, no lists just space seperated domains.
    def makeDomainsAList(d):
        return ' '.join([a for a in d if not a is None])

    domainFeatures = domainV.fit_transform(
        df['domains'].apply(makeDomainsAList))

    from scipy.sparse import csr_matrix, hstack

    data = hstack([
        csr_matrix(df[[
            'containsPythonStackTrace', 'containsJavaStackTrace',
            'containsExceptionInTaskBody', 'isThreadStart'
        ]].to_numpy()), bodyFeatures, domainFeatures
    ])

So far, the example code is structured to allow you to turn each function into a separate pipeline stage, however other options exist. We’ll examine how to use the entire notebook as a pipeline stage in “Putting It Together in a Pipeline”.

There are data preparation tools beyond Notebooks and Python of course. Notebooks are not always the best tool as they can have difficulty with version control. Python doesn’t always have the libraries (or performance) you need. So we’ll now look at how to use other available tools.

Custom Containers

Pipelines are not just limited to notebooks or even to specific languages.7 Depending on the project, you may have a regular Python project, custom tooling, Python2, or even FORTRAN code as an essential component.

For instances, in Chapter 9 we will use Scala to perform one step in our pipeline, and present a sidebar on how to get started with a RStats container.

Sometimes you won’t be able to find a container that so closely matches your needs as we did here. In these cases, you can take a generic base image and build on top of it, which we look at more in .

Beyond the need for custom containers, another reason you might choose to move beyond notebooks is to explore distributed tools.

Distributed Tooling

Using a distributed platform makes it possible to work with large data sets (beyond a single machine memory) and can provide significantly better performance. Often the time when we need start using distributed tooling is when our problem has out-grown our initial notebook solution.

The two main data-parallel distributed systems in Kubeflow are Apache Spark and Google’s Dataflow (via Apache Beam). Apache Spark has a larger install base and variety of formats and libraries supported. Apache Beam’s supports TensorFlow Extended project (TFX), an end-to-end ML tool, which integrates smoothly into TF serving for model inference. As it’s the most tightly integrated, we’ll start with exploring TFX on Apache Beam and then continue to the more standard Apache Spark.

TensorFlow Extended

The TensorFlow community has created an excellent set of integrated tools for everything from data validation to model serving. At present, TFX’s data tools are all built on top of Apache Beam, which has the most support for distributed processing on Google Cloud. If you want to use Kubeflow’s TFX components, you are limited to a single node; this may change in future versions.

Note

Apache Beam’s Python support outside of Google Cloud’s Dataflow is not as mature. TFX is a Python tool, so scaling it depends on Apache Beam’s Python support. You can scale the job by using the GCP only Dataflow component. As Apache Beam’s support for Apache Flink and Spark improves, support may be added for scaling the TFX components in a portable manner. 8

Kubeflow includes many of the TFX components in its pipeline system. TFX also has it’s own concept of pipelines. These are separate from Kubeflow pipelines, and in some cases TFX can be an alternative to Kubeflow. Here we will focus on the data and feature preparation components, since those are the simplest to be used with the rest of the Kubeflow ecosystem.

Keeping your data quality: TensorFlow data validation

It’s crucial to make sure data quality doesn’t decline over time. Data validation allows us to ensure that the schema and distribution of our data are only evolving in expected ways and catch data quality issues before they become production issues. TensorFlow Data Validation(TFDV) gives us the ability to validate our data.

To make the development process more straightforward, you should install TFX and TFDV locally. While the code can be evaluated inside of Kubeflow only, having the library locally will speed up your development work. Installing TFX and TFDV is a simple pip install shown in Example 5-5. 9

Example 5-5.
pip3 install tfx tensorflow-data-validation

Now let’s look at how to use TFX/TFDV in Kubeflow’s pipelines. The first step is loading the relevant components that we want to use. As we discussed in the previous chapter, while Kubeflow does have a load_component function, it automatically resolves on master making it unsuitable for production use cases. So we’ll use load_component_from_file along with a local copy of Kubeflow components from Example 4-5 to load our TFDV components. The basic components we need to load are: an example generator (think data loader), schema, statistics generators, and the validator itself. Loading the components is illustrated in Example 5-6.

Example 5-6.
tfx_csv_gen = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/ExampleGen/CsvExampleGen/component.yaml")
tfx_statistic_gen = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/StatisticsGen/component.yaml")
tfx_schema_gen = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/SchemaGen/component.yaml")
tfx_example_validator = kfp.components.load_component_from_file(
    "pipelines-0.2.5/components/tfx/ExampleValidator/component.yaml")

In addition to the components, we also need our data. The current TFX components pass data between pipeline stages using Kubeflow’s “file_output” mechanism. This places the output into Minio automatically tracking the artifacts related to the pipeline. To use TFDV on the recommender example’s input, we first download it using a standard container operation as in Example 5-7.

Example 5-7.
    fetch = kfp.dsl.ContainerOp(name='download',
                                image='busybox',
                                command=['sh', '-c'],
                                arguments=[
                                    'sleep 1;'
                                    'mkdir -p /tmp/data;'
                                    'wget ' + data_url +
                                    ' -O /tmp/data/results.csv'
                                ],
                                file_outputs={'downloaded': '/tmp/data'})
    # This expects a directory of inputs not just a single file
Tip

If we had the data on a persistent volume (say fetched in a previous stage), we could use the filesystem/get_file component.

Once you have the data loaded, TFX has a set of tools called example generators that ingest data. These support a few different formats, including CSV and TFRecord. There are also example generators for other systems, including Google’s BigQuery. There is not the same wide variety of formats supported by Spark or Pandas, so you may find a need to pre-process the records with another tool.10 In our recommender example, we use the CSV component, as shown in Example 5-8.

Example 5-8.
    records_example = tfx_csv_gen(input_base=fetch.output)

Now that we have a channel of examples, we can use this as one of the inputs for TFDV. The recommended approach for creating a schema is to use TFDV to infer the schema. To be able to infer the schema, TFDV first needs to compute some summary statistics of our data. Example 5-9 illustrates the pipeline stages for both of these steps.

Example 5-9.
    stats = tfx_statistic_gen(input_data=records_example.output)
    schema_op = tfx_schema_gen(stats.output)

If we infer the schema each time, we are unlikely to catch schema changes. Instead, you should save the schema and reuse it in future runs for validation. The pipeline’s run web page has a link to the schema in Minio, and you can either fetch it or copy it somewhere using another component or container operation.

Regardless of where you persist the schema, you should inspect it. To inspect the schema, you need to import the TFDV library, as shown in Example 5-10. Before you start using a schema to validate data, you should inspect the schema. To inspect the schema download the schema locally (or to your notebook) and use the display_schema function from TFDV, as shown in Example 5-11.

Example 5-10.
import tensorflow_data_validation as tfdv
Example 5-11.
schema = tfdv.load_schema_text("schema_info_2")
tfdv.display_schema(schema)

If needed, you can modify the schema following the instructions in the evolution guide. As the name implies, these tools are focused on supporting schema evolution over time.

Now that we know we’re using the right schema, let’s validate our data. The validate component takes in both the schema and the statistics we’ve generated, as shown in Example 5-12. You should replace the schema and statistics generation components with downloads of their outputs at production time.

Example 5-12.
    tfx_example_validator(stats=stats.outputs['output'],
                          schema=schema_op.outputs['output'])
Tip

Check the size of the rejected records before pushing to production. You may find that the data format has changed, and you need to use the schema evolution guide and possibly update the rest of the pipeline.

TensorFlow Transform, with TensorFlow Extended on Beam

The TFX program for doing feature preparation is called TensorFlow Transform (TFT) and integrates into the TensorFlow and Kubeflow ecosystems. As with TFDV, Kubeflow’s TensorFlow Transform component currently does not scale beyond single node processing. The best benefit of TFT is its integration into the TensorFlow Model Analysis tool (TFMA), simplifying feature preparation during inference.

We need to specify what transformations we want TFT to apply. Our TFT program should be in a separate file than the pipeline definition, although it is possible to in-line it as a string. To start with, we need some standard TFT imports shown in Example 5-13.

Example 5-13.
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

Now that we’ve got the imports, it’s time to create the entry point into our code for the component, shown in Example 5-14.

Example 5-14.
def preprocessing_fn(inputs):

Inside this function is where we do our data transformations to produce our features. Not all features need to be transformed, which is why there is also a copy method to mirror the input to the output if you’re only adding features. With our mailing list example, we can compute the vocabulary, as shown in Example 5-15.

Example 5-15.
    outputs = {}
    # TFT business logic goes here
    outputs["body_stuff"] = tft.compute_and_apply_vocabulary(inputs["body"],
                                                             top_k=1000)
    return outputs

This function does not support arbitrary python code. All transformations must be expressed as TensorFlow or TensorFlow Transform operations. TensorFlow operations operate on one tensor at a time, but in data preparation we often want to compute something over all of our input data, and TensorFlow Transform’s operations give us this ability. See the TFT Python docs or call help(tft) to see some starting operations.

Once you’ve written the desired transformations, it is time to add them to the pipeline. The simplest way to do this is with Kubeflow’s tfx/Transform component. Loading the component is the same as the other TFX components,illustrated in Example 5-6. Using this component is unique in requiring the transformation code to be passed in as a file uploaded to either S3 or GCS. It also needs the data, and you can use the output from TFDV (if you used it) or load the examples as we did for TFDV. Using the TFT component is illustrated in Example 5-16.

Example 5-16.
    transformed_output = tfx_transform(
        input_data=records_example.output,
        schema=schema_op.outputs['output'],
        module_file=module_file)  # Path to your TFT code on GCS/S3

Now you have a machine learning pipeline that has feature preparation along with a critical artifact to transform requests at serving time. The close integration of TensorFlow Transform can make model serving much less complicated. TensorFlow Transform with Kubeflow components doesn’t have the power for all projects, so we’ll look at distributed feature preparation next.

Distributed Data Using Apache Spark

Apache Spark is an open source distributed data processing tool which can run on a variety of clusters. Kubeflow supports Apache Spark through a few different components so you can access cloud specific features. Since you may not be familiar with Spark we’ll briefly introduce Spark’s Dataset/Dataframe APIs in the context of data & feature preparation. If you want to go beyond the basics, we recommend Learning Spark, Spark the Definitive Guide, or High Performance Spark as resources to improve your Spark skills.

Note

Here our code is structured to go in as a single stage for all of the feature and data preparation since once you’re at scale writing & loading the data between steps is costly.

Spark Operators in Kubeflow

Using Kubeflow’s native spark-operator, EMR, or Dataproc are best once you’ve moved beyond the experimental phase. The most portable operator is the native spark-operator, which does not depend on any specific cloud. To use any of the operators, you need to package the Spark program and store it on either a distributed file system (like GCS, S3, etc.) or put it inside a container.

If you’re working in Python or R, we recommend building a Spark container so you can install your dependencies. With Scala or Java code, this is less critical. If you put the application inside of a container, you can reference it with local:///. You can use the gcr.io/spark-operator/spark-py:v2.4.5 container as the base or build your own container following Spark on Kubernetes instructions, or see Chapter 9. The Example 5-19 installs both any requirements and copies the application. If you decide to update the application, you can still use the container, just configure the main resource with a distributed filesystem. We cover building custom containers more in Chapter 9.

Example 5-19.
# Use the spark operator image as base
FROM gcr.io/spark-operator/spark-py:v2.4.5
# Install Python requirements
COPY requirements.txt /
RUN pip3 install -r /requirements.txt
# Now you can reference local:///job/my_file.py
RUN mkdir -p /job
COPY *.py /job

ENTRYPOINT ["/opt/entrypoint.sh"]

Two cloud-specific options for running Spark are the Amazon EMR or Google Dataproc components in Kubeflow. However, they each take different parameters meaning that you will need to translate your pipeline.

The EMR components allow you to set up clusters, submit jobs, and clean up the clusters. The two cluster task components are aws/emr/create_cluster for the start and aws/emr/delete_cluster. The component for running a PySpark job is aws/emr/submit_pyspark_job. If you are not re-using an external cluster, it’s important to trigger the delete component regardless of if the submit_pyspark_job component succeeds.

While they have different parameters, the workflow for Dataproc clusters mirrors that of EMR. The components are similarly named, with gcp/dataproc/create_cluster/ and gcp/dataproc/delete_cluster/ for the lifecycle and gcp/dataproc/submit_pyspark_job/ for running our job.

Unlike the EMR and Dataproc components, the Spark operator does not have a component. For Kubernetes operators without components, you can use the dsl.ResourceOp to call them. Using the ResourceOp to launch a Spark job is illustrated in Example 5-20.

Example 5-20.
resource = {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "kind": "SparkApplication",
    "metadata": {
        "name": "boop",
        "namespace": "kubeflow"
    },
    "spec": {
        "type": "Python",
        "mode": "cluster",
        "image": "gcr.io/boos-demo-projects-are-rad/kf-steps/kubeflow/myspark",
        "imagePullPolicy": "Always",
        # See the Dockerfile OR use GCS/S3/...
        "mainApplicationFile": "local:///job/job.py",
        "sparkVersion": "2.4.5",
        "restartPolicy": {
            "type": "Never"
        },
        "driver": {
            "cores": 1,
            "coreLimit": "1200m",
            "memory": "512m",
            "labels": {
                "version": "2.4.5",
            },
            # also try spark-operatoroperator-sa
            "serviceAccount": "spark-operatoroperator-sa",
        },
        "executor": {
            "cores": 1,
            "instances": 2,
            "memory": "512m"
        },
        "labels": {
            "version": "2.4.5"
        },
    }
}


@dsl.pipeline(name="local Pipeline", description="No need to ask why.")
def local_pipeline():

    rop = dsl.ResourceOp(
        name="boop",
        k8s_resource=resource,
        action="create",
        success_condition="status.applicationState.state == COMPLETED")
Warning

Kubeflow doesn’t apply any validation to ResourceOp requests. For example, in Spark, the job name must be able to be used as the start of a valid DNS name, and while in container ops container names are rewritten, ResourceOps just directly pass through requests.

Reading the Input Data

Spark supports a wide variety of data sources, including (but not limited to): Parquet, JSON, JDBC, ORC, JSON, Hive, CSV, ElasticSearch, MongoDB, Neo4j, Cassandra, Snowflake, Redis, Riak Time Series, etc. 11 Loading data is very straightforward, and often all that is needed is specifying the format. For example, in our mailing list example, reading the Parquet formatted output of our data preparation stage is done as in Example 5-25.

Example 5-25.
initial_posts = session.read.format("parquet").load(fs_prefix +
                                                    "/initial_posts")
ids_in_reply = session.read.format("parquet").load(fs_prefix + "/ids_in_reply")

If the above was instead formatted as JSON, we would only have to change “parquet” to “JSON.” 12

Validating the schema

We often believe we know the fields and types of our data. Spark can quickly discover the schema when our data is in a self-describing format like Parquet. In other formats, like JSON, the schema isn’t known until Spark reads the records. Regardless of the data format, it is good practice to specify the schema and ensure the data matches it, as shown in Example 5-26. Errors during data load are easier to debug than errors during model deployment.

Example 5-26.
ids_schema = StructType([
    StructField("In-Reply-To", StringType(), nullable=True),
    StructField("message-id", StringType(), nullable=True)
])
ids_in_reply = session.read.format("parquet").schema(ids_schema).load(
    fs_prefix + "/ids_in_reply")

You can configure Spark to handle corrupted and non-conforming records by dropping them, keeping them, or stopping the process (i.e. failing the job). The default is permissive, which keeps the invalid records while setting the fields to null, allowing us to handle schema mismatch with the same techniques for missing fields.

Handling missing fields

In many situations, some of our data is missing. You can choose to drop records with missing fields, fall back to secondary fields, impute averages, or leave as is. Spark’s built-in tools for these tasks are inside DataFrameNaFunctions. The correct solution depends on both your data and the algorithm you end up using. The most common is to drop records and make sure that we have not filtered out too many records, illustrated using the mailing list data in Example 5-27.

Example 5-27.
initial_posts_count = initial_posts.count()
initial_posts_cleaned = initial_posts.na.drop(how='any',
                                              subset=['body', 'from'])
initial_posts_cleaned_count = initial_posts_cleaned.count()

Filtering out bad data

Detecting incorrect data can be challenging. However, without performing at least some data cleaning, the model may train on noise. Often determining bad data depends on the practitioners domain knowledge of the problem.

A common technique supported in Spark is outlier removal. However, naively applying this can remove valid records. Using your domain experience, you can write a custom validation function and remove any records which don’t match it using the filter function in Spark as illustrated with our mailing list example in Example 5-28.

Example 5-28.
def is_ok(post):
    # Your special business logic goes here
    return True


spark_mailing_list_data_cleaned = spark_mailing_list_data_with_date.filter(
    is_ok)

Saving the output

Once you have the data ready, it’s time to save the output. If you’re going to use Apache Spark to do feature preparation, you can skip this step for now.

If you want to go back to single machine tools, it’s often simplest to save to a persistent volume. To do this, bring the data back to the main program by calling toPandas() as shown in Example 5-30. Now you can save the data in whatever format the next tool expects.

Example 5-30.
initial_posts.toPandas()

If the data is large, or you otherwise want to use an object store, Spark can write to many different formats (just as it can load from many different formats). The correct format depends on the tool you intend to use for feature preparation. Writing to parquet is shown in Example 5-31.

Example 5-31.
initial_posts.write.format("parquet").mode('overwrite').save(fs_prefix +
                                                             "/initial_posts")
ids_in_reply.write.format("parquet").mode('overwrite').save(fs_prefix +
                                                            "/ids_in_reply")

Now you’ve seen a variety of different tools you can use to source and clean the data. We’ve looked at the flexibility of local tools, the scalability of distributed tools, and the integration from TensorFlow Extended. With the data in shape, let’s now make sure the right features are available and get them in a usable format for the machine learning model.

Distributed Feature Preparation Using Apache Spark

Apache Spark has a large number of built-in feature preparation tools, in pyspark.ml.feature, that you can use to generate features. You can use Spark in the same way as you did during data preparation. You may find using Spark’s own ML pipeline an easy way to put together multiple feature preparation stages.

For the Spark mailing list example, we have textual input data. To allow us to train a variety of models, converting this into word vectors is our preferred form of feature prep. Doing so involves first tokenizing the data with Spark’s Tokenizer. Once we have the tokens, we can train a word2vec model and produce our word vectors. Example 5-32 illustrates how to prepare features for the mailing list example using Spark.

Example 5-32.
tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
body_hashing = HashingTF(inputCol="body_tokens",
                         outputCol="raw_body_features",
                         numFeatures=10000)
body_idf = IDF(inputCol="raw_body_features", outputCol="body_features")
body_word2Vec = Word2Vec(vectorSize=5,
                         minCount=0,
                         numPartitions=10,
                         inputCol="body_tokens",
                         outputCol="body_vecs")
assembler = VectorAssembler(inputCols=[
    "body_features", "body_vecs", "contains_python_stack_trace",
    "contains_java_stack_trace", "contains_exception_in_task"
],
                            outputCol="features")

With this final distributed feature preparation example, you’re ready to scale up to handle larger data sizes if they ever come your way. If you’re working with smaller data, you’ve seen how you can use the same simple techniques of containerization to allow you to continue to work with your favourite tools. Either way, you’re almost ready for the next stage in the machine learning pipeline.

Putting It Together in a Pipeline

We have shown how to solve individual problems in data and feature preparation, but now we need to bring it all together. In our local example, we wrote our functions with the types and returned parameters to make it easy to put into a pipeline. Since we return the path of where our output is in each stage, we can use the function outputs to create the dependency graph for us. Putting these functions together into a pipeline is illustrated in Example 5-33.

Example 5-33.
@kfp.dsl.pipeline(name='Simple1', description='Simple1')
def my_pipeline_mini(year: int):
    dvop = dsl.VolumeOp(name="create_pvc",
                        resource_name="my-pvc-2",
                        size="5Gi",
                        modes=dsl.VOLUME_MODE_RWO)
    tldvop = dsl.VolumeOp(name="create_pvc",
                          resource_name="tld-volume-2",
                          size="100Mi",
                          modes=dsl.VOLUME_MODE_RWO)
    download_data_op = kfp.components.func_to_container_op(
        download_data, packages_to_install=['lxml', 'requests'])
    download_tld_info_op = kfp.components.func_to_container_op(
        download_tld_data,
        packages_to_install=['requests', 'pandas>=0.24', 'tables'])
    clean_data_op = kfp.components.func_to_container_op(
        clean_data, packages_to_install=['pandas>=0.24', 'tables'])

    step1 = download_data_op(year).add_pvolumes(
        {"/data_processing": dvop.volume})
    step2 = clean_data_op(input_path=step1.output).add_pvolumes(
        {"/data_processing": dvop.volume})
    step3 = download_tld_info_op().add_pvolumes({"/tld_info": tldvop.volume})


kfp.compiler.Compiler().compile(my_pipeline_mini, 'local-data-prep-2.zip')

You can see that feature preparation step above follows the same general pattern of all of the local components. However, the libraries that we need for our feature preparation are a bit different, so we’ve changed the packages_to_install value to install scikit.

Example 5-34.
    df['domains'] = df['links'].apply(extract_domains)
    df['isThreadStart'] = df['depth'] == '0'

    # Arguably, you could split building the dataset away from the actual witchcraft.
    from sklearn.feature_extraction.text import TfidfVectorizer

    bodyV = TfidfVectorizer()
    bodyFeatures = bodyV.fit_transform(df['body'])

    domainV = TfidfVectorizer()

    ## A couple of "None" domains really screwed the pooch on this one.Also, no lists just space seperated domains.
    def makeDomainsAList(d):
        return ' '.join([a for a in d if not a is None])

    domainFeatures = domainV.fit_transform(
        df['domains'].apply(makeDomainsAList))

    from scipy.sparse import csr_matrix, hstack

    data = hstack([
        csr_matrix(df[[
            'containsPythonStackTrace', 'containsJavaStackTrace',
            'containsExceptionInTaskBody', 'isThreadStart'
        ]].to_numpy()), bodyFeatures, domainFeatures
    ])
Tip

When you start exploring a new dataset you may find it easier to use a notebook as usual, without the pipeline components. When possible following the same general structure you would with pipelines will make it easier to productionize your exploratory work.

These steps don’t specify the container to use. For the container with SpamAssassin you’ve just built, you write it like Example 5-35.

Example 5-35.
clean_data_op = kfp.components.func_to_container_op(
    clean_data,
    base_image="{0}/kubeflow/spammassisan".format(container_registry),
    packages_to_install=['pandas>=0.24', 'tables'])

Sometimes the cost of writing our data out in between stages is too expensive. In our recommender example, unlike in the mailing list example, we’ve chosen to put data and feature prep together into a single pipeline stage. In our distributed mailing list example, we build one single Spark job as well. In these cases, our entire work so far is just on stage. Using a single-stage allows us to avoid having to write the file out in between but can complicate debugging.

Using an Entire Notebook as a Dataprep Pipeline Stage

If you don’t want to turn the individual parts of the data preparation notebook into a pipeline, you can use the entire notebook as one stage. You can use the same containers used by JupyterHub to run the notebook programmatically.

To do this, you’ll need to make a new Dockerfile, specify that it is based on top of another container using FROM and then add a COPY directive to package the notebook inside the new container. Since the census data example has a pre-existing notebook, that’s the approach we’ve taken in Example 5-36.

Example 5-36.
FROM gcr.io/kubeflow-images-public/tensorflow-1.6.0-notebook-cpu

COPY ./ /workdir /

If you require additional Python dependencies, you can use the RUN directive to install them. Putting the dependencies in the container can help speed up the pipeline, especially for complicated packages. For our mailing list example, the Dockerfile would look like Example 5-37.

Example 5-37.
RUN pip3 install --upgrade lxml pandas

Now we can use this container like any other in the pipeline with dsl.ContainerOp as we did with the recommender example in the pipeline chapter. Now you have two ways to use notebooks in Kubeflow, and we’ll cover options beyond notebooks next.

Tip

Does the notebook need GPU resources? When specifying the dsl.ContainerOp add a call to set_gpu_limit and specify either nvidia or amd depending on the desired GPU type.

Conclusion

Now you have your data ready to train a model. We’ve seen how there is no one-size-fits-all approach to feature & data preparation; our different examples needed different tooling. We’ve also seen how the methods can require changing within the same problem, like when we expanded the scope of the mailing list example to include more data. The amount and quality of the features, and the data to produce them, are critical to the success of the machine learning projects. You can test this by running the examples with smaller data sizes and comparing the models.

It’s also important to remember that data and feature preparation is not a one-and-done activity, and you may want to revisit this step as you develop this model. You may find that there is a feature you wish you had, or a feature you thought would perform well isn’t suggesting data quality issues. In the coming chapters, we will train our models and serve them, feel free to revisit the data and feature preparation as we do.

1 See the TFX docs link:https://www.tensorflow.org/tfx for a good summary if you’re new to data preparation.

2 The positive impact of using more data in training is made clear in “The Unreasonable Effectiveness of Data by Halevy, Norvid and Pereira, More Data beats Better Algorithms by Tyler Schnoebelen.”

3 For the formal definition see https://www.kdnuggets.com/2018/12/six-steps-master-machine-learning-data-preparation.html.

4 There are too many tools to cover here, but the blog post at link:https://solutionsreview.com/data-integration/the-best-data-preparation-tools-and-software/ includes many.

5 Datasets tend to grow over time rather than shrinking, starting with distributed tooling can help you scale your work.

6 See this https://www.theanalysisfactor.com/seven-ways-to-make-up-data-common-methods-to-imputing-missing-data/ post on some common techniques for imputing missing data.

7 Have some VB6 code you really need to run? Check out the chapter on going beyond tensorflow and make a small sacrifice of wine.

8 There is a compatibility matrix at https://beam.apache.org/documentation/runners/capability-matrix/, although currently Beam’s Python support requires launching an additional Docker container, making support on Kubernetes more complicated.

9 While TFX automatically installs TFDV, if you have an old installation and you don’t specify tensorflow-data-validation you may get an error of Could not find a version that satisfies the requirement so we illustrate explicitly installing both here.

10 While technically not a file format, since TFX can accept pandas’ dataframes a common pattern is to load with pandas first.

11 There is no definitive list, although many vendors list there formats on https://spark-packages.org/

12 Of course since most formats have slight variations, they have configuration options if the defaults don’t work.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset