6

Scaling Up Prediction to Terabyte Click Logs

In the previous chapter, we developed an ad click-through predictor using a logistic regression classifier. We proved that the algorithm is highly scalable by training efficiently on up to 1 million click log samples. In this chapter, we will further boost the scalability of the ad click-through predictor by utilizing a powerful parallel computing (or, more specifically, distributed computing) tool called Apache Spark.

This chapter will demystify how Apache Spark is used to scale up learning on massive data, as opposed to limiting model learning to one single machine. We will also use PySpark, which is the Python API, to explore click log data, to develop classification solutions based on the entire click log dataset, and to evaluate performance, all in a distributed manner. Aside from this, I will introduce two approaches to playing around with categorical features: one is related to hashing in computer science, while the other fuses multiple features. They will be implemented in Spark as well.

In this chapter, we will cover the following topics:

  • The main components of Apache Spark
  • Spark installation
  • Deploying a Spark application
  • Fundamental data structures in PySpark
  • Core programming in PySpark
  • The implementations of ad click-through predictions in PySpark
  • Data exploratory analysis in PySpark
  • Caching and persistence in Spark
  • Feature hashing and its implementations in PySpark
  • Feature interaction and its implementations in PySpark

Learning the essentials of Apache Spark

Apache Spark is a distributed cluster computing framework designed for fast and general-purpose computation. It is an open-source technology originally developed by Berkeley's AMPLab at the University of California. It provides an easy-to-use interface for programming interactive queries and stream processing data. What makes it a popular big data analytics tool is its implicit data parallelism, where it automates operations on data in parallel across processors in the computing cluster. Users only need to focus on how they want to manipulate the data, without worrying about how it is distributed among all the computing nodes or which part of the data a node is responsible for.

Bear in mind that this book is mainly about machine learning. Hence, we will only briefly cover the fundamentals of Spark, including its components, installation, deployment, data structure, and core programming.

Breaking down Spark

We will start with the main components of Spark, which are depicted in the following diagram:

Figure 6.1: The main components of Spark

Let's discuss them in more detail:

  • Spark Core: This is the foundation and the execution engine of the overall platform. It provides task distribution, scheduling, and in-memory computing. As its name implies, Spark Core is what all the other functionalities are built on top of. It can also be exposed through the APIs of multiple languages, including Python, Java, Scala, and R.
  • Spark SQL: This is a component built upon Spark Core that introduces a high-level data abstraction called DataFrames. We will talk about data structures in Spark later. Spark SQL supports SQL-like data manipulation in Python, Java, and Scala, which works great with structured and semi-structured data. We will be using modules from Spark SQL in this chapter.
  • Spark Streaming: This performs real-time (or nearly real-time) data analytics by leveraging Spark Core's fast scheduling and in-memory computing capabilities.
  • MLlib: Short for machine learning library, this is a distributed machine learning framework built on top of Spark Core. It allows for learning on large-scale data efficiently thanks to its distributed architecture and in-memory computing capabilities. In in-memory computation, data is kept in the random access memory (RAM) if it has sufficient capacity, instead of on disk. This reduces the cost of memory and of reloading data backward and forward during the iterative process. Training a machine learning model is basically an iterative learning process. Hence, the in-memory computing capability of Spark makes it extremely applicable to machine learning modeling. According to major performance benchmarks, learning using MLlib is nearly 10 times as fast as a disk-based solution. In this chapter, we will be using modules from Spark MLlib.
  • GraphX: This is another functionality built on top of Spark Core that focuses on distributed graph-based processing. PageRank and Pregel abstraction are two typical use cases.

    The main goal of this section is to understand Spark as a distributed cluster computing framework designed for fast computation, which facilitates both data analytics and iterative learning. If you are looking for more detailed information on Spark, there is a lot of useful documentation and tutorials available online, such as https://spark.apache.org/docs/latest/quick-start.html.

Installing Spark

For learning purposes, let's now install Spark on the local computer (even though it is more frequently used in a cluster of servers). Full instructions can be found at https://spark.apache.org/downloads.html. There are several versions, and we will take version 2.4.5 (Feb 05, 2020) with pre-built for Apache Hadoop 2.7 as an example.

At the time of writing, the latest stable version is 2.4.5. Although there is a preview version, 3.0.0, I think the latest stable version is enough to start with. You won't notice much difference between 3.0 and 2.4.5 going through this chapter. Please note that the module pyspark.ml.feature.OneHotEncoderEstimator has been deprecated, and removed in the preview versions (v 3.0.0 and above). It's functionality has been clubbed with pyspark.ml.feature.OneHotEncoder.

As illustrated in the following screenshot, after selecting 2.4.5 in step 1, we choose the Pre-built for Apache Hadoop 2.7 option in step 2. Then, we click the link in step 3 to download the spark-2.4.5-bin-hadoop2.7.tgz file:

Figure 6.2: Steps to download Spark

Unzip the downloaded file. The resulting folder contains a complete Spark package; you don't need to do any extra installation.

Before running any Spark program, we need to make sure the following dependencies are installed:

  • Java 8+, and that it is included in the system environment variables
  • Scala version 2.11

To check whether Spark is installed properly, we run the following tests:

  1. First, we approximate the value of π using Spark by typing in the following command in Terminal (note that bin is a folder in spark-2.4.5-bin-hadoop2.7, so remember to run the following commands inside this folder):
    ./bin/run-example SparkPi 10
    
  2. It should print out something similar to the following (the values may differ):
    Pi is roughly 3.141851141851142
    

    This test is actually similar to the following:

    ./bin/spark-submit examples/src/main/python/pi.py 10
    
  3. Next, we test the interactive shell with the following command:
    ./bin/pyspark --master local[2]
    

    This should open a Python interpreter, as shown in the following screenshot:

Figure 6.3: Running Spark in the shell

By now, the Spark program should be installed properly. We will talk about the commands (pyspark and spark-submit) in the following sections.

Launching and deploying Spark programs

A Spark program can run by itself or over cluster managers. The first option is similar to running a program locally with multiple threads, and one thread is considered one Spark job worker. Of course, there is no parallelism at all, but it is a quick and easy way to launch a Spark application, and we will be deploying it in this mode by way of demonstration throughout this chapter. For example, we can run the following script to launch a Spark application:

     ./bin/spark-submit examples/src/main/python/pi.py

This is precisely what we did in the previous section. Or, we can specify the number of threads:

    ./bin/spark-submit --master local[4] examples/src/main/python/pi.py

In the previous code, we run Spark locally with four worker threads, or as many cores as there are on the machine, by using the following command:

    ./bin/spark-submit --master local[*] examples/src/main/python/pi.py

Similarly, we can launch the interactive shell by replacing spark-submit with pyspark:

     ./bin/pyspark --master local[2] examples/src/main/python/pi.py

As for the cluster mode, it (version 2.4.5) currently supports the following approaches:

  • Standalone: This is the simplest mode to use to launch a Spark application. It means that the master and workers are located on the same machine. Details of how to launch a Spark application in standalone cluster mode can be found at the following link: https://spark.apache.org/docs/latest/spark-standalone.html.
  • Apache Mesos: As a centralized and fault-tolerant cluster manager, Mesos is designed for managing distributed computing environments. In Spark, when a driver submits tasks for scheduling, Mesos determines which machines handle which tasks. Refer to https://spark.apache.org/docs/latest/running-on-mesos.html for further details.
  • Apache Hadoop YARN: The task scheduler in this approach becomes YARN, as opposed to Mesos in the previous one. YARN, which is short for Yet Another Resource Negotiator, is the resource manager in Hadoop. With YARN, Spark can be integrated into the Hadoop ecosystem (such as MapReduce, Hive, and File System) more easily. For more information, please go to the following link: https://spark.apache.org/docs/latest/running-on-yarn.html.
  • Kubernetes: This is an open-source system that provides container-centric infrastructure. It helps automate job deployment and management and has gained in popularity in recent years. Kubernetes for Spark is still pretty new, but if you are interested, feel free to read more at the following link: https://spark.apache.org/docs/latest/running-on-kubernetes.html.

It's easy to launch and deploy a Spark application. How about coding in PySpark? Let's see in the next section.

Programming in PySpark

This section provides a quick introduction to programming with Python in Spark. We will start with the basic data structures in Spark.

Resilient Distributed Datasets (RDD) is the primary data structure in Spark. It is a distributed collection of objects and has the following three main features:

  • Resilient: When any node fails, affected partitions will be reassigned to healthy nodes, which makes Spark fault-tolerant
  • Distributed: Data resides on one or more nodes in a cluster, which can be operated on in parallel
  • Dataset: This contains a collection of partitioned data with their values or metadata

RDD was the main data structure in Spark before version 2.0. After that, it was replaced by the DataFrame, which is also a distributed collection of data but organized into named columns. DataFrames utilize the optimized execution engine of Spark SQL. Therefore, they are conceptually similar to a table in a relational database or a DataFrame object in the Python pandas library.

Although the current version of Spark still supports RDD, programming with DataFrames is highly recommended. Hence, we won't spend too much time on programming with RDD. Refer to https://spark.apache.org/docs/latest/rdd-programming-guide.html if you are interested.

The entry point to a Spark program is creating a Spark session, which can be done by using the following lines:

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession 
...     .builder 
...     .appName("test") 
...     .getOrCreate()

Note that this is not needed if we run it in a PySpark shell. Right after we spin up a PySpark shell (with ./bin/pyspark), a Spark session is automatically created. We can check the running Spark application at the following link: localhost:4040/jobs/. Refer to the following screenshot for the resulting page:

Figure 6.4: Spark application UI

With a Spark session, spark, a DataFrame object can be created by reading a file (which is usually the case) or manual input. In the following example, we will create a DataFrame object, df, from a CSV file:

>>> df = spark.read.csv("examples/src/main/resources/people.csv", 
                                           header=True, sep=';')

Columns in the CSV file people.csv are separated by ;.

Once this is done, we will see a completed job in localhost:4040/jobs/:

Figure 6.5: Completed job list in the Spark application

We can display the contents of the df object by using the following command:

>>> df.show()
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

We can count the number of rows by using the following command:

>>> df.count()
2

The schema of the df object can be displayed using the following command:

>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)

One or more columns can be selected as follows:

>>> df.select("name").show()
+-----+
| name|
+-----+
|Jorge|
|  Bob|
+-----+
>>> df.select(["name", "job"]).show()
+-----+---------+
| name|      job|
+-----+---------+
|Jorge|Developer|
|  Bob|Developer|
+-----+---------+

We can filter rows by condition, for instance, by the value of one column, using the following command:

>>> df.filter(df['age'] > 31).show()
+----+---+---------+
|name|age|      job|
+----+---+---------+
| Bob| 32|Developer|
+----+---+---------+

We will continue programming in PySpark in the next section, where we will use Spark to solve the ad click-through problem.

Learning on massive click logs with Spark

Normally, in order to take advantage of Spark, data is stored using Hadoop Distributed File System (HDFS), which is a distributed file system designed to store large volumes of data, and computation occurs over multiple nodes on clusters. For demonstration purposes, we will keep the data on a local machine and run Spark locally. This is no different from running it on a distributed computing cluster.

Loading click logs

To train a model on massive click logs, we first need to load the data in Spark. We do so by taking the following steps:

  1. We spin up the PySpark shell by using the following command:
    ./bin/pyspark --master local[*]  --driver-memory 20G
    

    Here, we specify a large driver memory as we are dealing with a dataset of more than 6 GB.

    A driver program is responsible for collecting and storing processed results from executors. So, a large driver memory helps complete jobs where lots of data is processed.

  2. Next, we start a Spark session with an application named CTR:
    >>> spark = SparkSession
    ...     .builder
    ...     .appName("CTR")
    ...     .getOrCreate()
    
  3. Then, we load the click log data from the train file into a DataFrame object. Note that the data load function spark.read.csv allows custom schemas, which guarantees data is loaded as expected, as opposed to automatically inferring schemas. So, first, we define the schema:
    >>> from pyspark.sql.types import StructField, StringType, 
             StructType, IntegerType
    >>> schema = StructType([
    ...     StructField("id", StringType(), True),
    ...     StructField("click", IntegerType(), True),
    ...     StructField("hour", IntegerType(), True),
    ...     StructField("C1", StringType(), True),
    ...     StructField("banner_pos", StringType(), True),
    ...     StructField("site_id", StringType(), True),
    ...     StructField("site_domain", StringType(), True),
    ...     StructField("site_category", StringType(), True),
    ...     StructField("app_id", StringType(), True),
    ...     StructField("app_domain", StringType(), True),
    ...     StructField("app_category", StringType(), True),
    ...     StructField("device_id", StringType(), True),
    ...     StructField("device_ip", StringType(), True),
    ...     StructField("device_model", StringType(), True),
    ...     StructField("device_type", StringType(), True),
    ...     StructField("device_conn_type", StringType(), True),
    ...     StructField("C14", StringType(), True),
    ...     StructField("C15", StringType(), True),
    ...     StructField("C16", StringType(), True),
    ...     StructField("C17", StringType(), True),
    ...     StructField("C18", StringType(), True),
    ...     StructField("C19", StringType(), True),
    ...     StructField("C20", StringType(), True),
    ...     StructField("C21", StringType(), True),
    ... ])
    

    Each field of the schema contains the name of the column (such as idclick, or hour), the data type (such as integer or string), and whether missing values are allowed (allowed, in this case).

  4. With the defined schema, we create a DataFrame object, df:
    >>> df = spark.read.csv("file://path_to_file/train", schema=schema, 
                                                          header=True)
    

    Remember to replace path_to_file with the absolute path of where the train data file is located. The file:// prefix indicates that data is read from a local file. Another prefix, dbfs://, is used for data stored in HDFS.

  5. We now double-check the schema as follows:
    >>> df.printSchema()
    root
     |-- id: string (nullable = true)
     |-- click: integer (nullable = true)
     |-- hour: integer (nullable = true)
     |-- C1: string (nullable = true)
     |-- banner_pos: string (nullable = true)
     |-- site_id: string (nullable = true)
     |-- site_domain: string (nullable = true)
     |-- site_category: string (nullable = true)
     |-- app_id: string (nullable = true)
     |-- app_domain: string (nullable = true)
     |-- app_category: string (nullable = true)
     |-- device_id: string (nullable = true)
     |-- device_ip: string (nullable = true)
     |-- device_model: string (nullable = true)
     |-- device_type: string (nullable = true)
     |-- device_conn_type: string (nullable = true)
     |-- C14: string (nullable = true)
     |-- C15: string (nullable = true)
     |-- C16: string (nullable = true)
     |-- C17: string (nullable = true)
     |-- C18: string (nullable = true)
     |-- C19: string (nullable = true)
     |-- C20: string (nullable = true)
     |-- C21: string (nullable = true)
    
  6. And the data size is checked as follows:
    >>> df.count()
    40428967
    
  7. Also, we need to drop several columns that provide little information. We use the following code to do that:
    >>> df = 
        df.drop('id').drop('hour').drop('device_id').drop('device_ip')
    
  8. We rename the column from click to label, as this will be consumed more often in the downstream operations:
    >>> df = df.withColumnRenamed("click", "label")
    
  9. Let's look at the current columns in the DataFrame object:
    >>> df.columns
    ['label', 'C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']
    

After inspecting the input data, we need to split and cache the data.

Splitting and caching the data

Here, we split the data into a training set and testing set, as follows:

>>> df_train, df_test = df.randomSplit([0.7, 0.3], 42)

In this case, 70% of the samples are used for training and the remaining samples are used for testing, with a random seed specified, as always, for reproduction.

Before we perform any heavy lifting (such as model learning) on the training set, df_train, it is good practice to cache the object. In Spark, caching and persistence are optimization techniques that reduce the computation overhead. This saves the intermediate results of RDD or DataFrame operations in memory and/or on disk. Without caching or persistence, whenever an intermediate DataFrame is needed, it will be recalculated again according to how it was created originally. Depending on the storage level, persistence behaves differently:

  • MEMORY_ONLY: The object is only stored in memory. If it does not fit in memory, the remaining part will be recomputed each time it is needed.
  • DISK_ONLY: The object is only kept on disk. A persisted object can be extracted directly from storage without being recalculated.
  • MEMORY_AND_DISK: The object is stored in memory, and might be on disk as well. If the full object does not fit in memory, the remaining partition will be stored on disk, instead of being recalculated every time it is needed. This is the default mode for caching and persistence in Spark. It takes advantage of both the fast retrieval of in-memory storage and the high accessibility and capacity of disk storage.

In PySpark, caching is simple. All that is required is a cache method.

Let's cache both the training and testing DataFrames:

>>> df_train.cache()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
>>> df_train.count()
28297027
>>> df_test.cache()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
>>> df_test.count()
12131940

Now, we have the training and testing data ready for downstream analysis.

One-hot encoding categorical features

Similar to the previous chapter, we need to encode categorical features into sets of multiple binary features by executing the following steps:

  1. In our case, the categorical features include the following:
    >>> categorical = df_train.columns
    >>> categorical.remove('label')
    >>> print(categorical)
    ['C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']
    

    In PySpark, one-hot encoding is not as direct as it is in scikit-learn (specifically, with the OneHotEncoder module).

  2. We need to index each categorical column using the StringIndexer module:
    >>> from pyspark.ml.feature import StringIndexer
    >>> indexers = [
    ...       StringIndexer(inputCol=c, outputCol=
                 "{0}_indexed".format(c)).setHandleInvalid("keep")
    ...                                     for c in categorical
    ... ]
    

    The setHandleInvalid("keep") handle makes sure the application won't crash if any new categorical value occurs. Try to omit it; you will see error messages related to unknown values.

  3. Then, we perform one-hot encoding on each individual indexed categorical column using the OneHotEncoderEstimator module:
    >>> from pyspark.ml.feature import OneHotEncoderEstimator
    >>> encoder = OneHotEncoderEstimator(
    ...     inputCols=[indexer.getOutputCol() for indexer in indexers],
    ...     outputCols=["{0}_encoded".format(indexer.getOutputCol()) 
                                              for indexer in indexers]
    ... )
    
  4. Next, we concatenate all sets of generated binary vectors into a single one using the VectorAssembler module:
    >>> from pyspark.ml.feature import VectorAssembler
    >>> assembler = VectorAssembler(
    ...                     inputCols=encoder.getOutputCols(),
    ...                     outputCol="features"
    ... )
    

    This creates the final encoded vector column called features.

  5. We chain all these three stages together into a pipeline with the Pipeline module in PySpark, which better organizes our one-hot encoding workflow:
    >>> stages = indexers + [encoder, assembler]
    >>> from pyspark.ml import Pipeline
    >>> pipeline = Pipeline(stages=stages)
    

    The variable stages is a list of operations needed for encoding.

  6. Finally, we can fit the pipeline one-hot encoding model over the training set:
    >>> one_hot_encoder = pipeline.fit(df_train)
    
  7. Once this is done, we use the trained encoder to transform both the training and testing sets. For the training set, we use the following code:
    >>> df_train_encoded = one_hot_encoder.transform(df_train)
    >>> df_train_encoded.show()
    
  8. At this point, we skip displaying the results as there are dozens of columns with several additional ones added on top of df_train. However, we can see the one we are looking for, the features column, which contains the one-hot encoded results. Hence, we only select this column, along with the target variable:
    >>> df_train_encoded = df_train_encoded.select(
                                    ["label", "features"])
    >>> df_train_encoded.show()
    +-----+--------------------+
    |label|            features|
    +-----+--------------------+
    |    0|(31458,[5,7,3527,...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,1271,...|
    |    0|(31458,[5,7,1271,...|
    |    0|(31458,[5,7,1271,...|
    |    0|(31458,[5,7,1271,...|
    |    0|(31458,[5,7,1532,...|
    |    0|(31458,[5,7,4366,...|
    |    0|(31458,[5,7,14,45...|
    +-----+--------------------+
    only showing top 20 rows
    

    The feature column contains sparse vectors of size 31458.

  9. Don't forget to cache df_train_encoded, as we will be using it to iteratively train our classification model:
    >>> df_train_encoded.cache()
    DataFrame[label: int, features: vector]
    
  10. To release some space, we uncache df_train, since we will no longer need it:
    >>> df_train.unpersist()
    DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
    
  11. Now, we repeat the preceding steps for the testing set:
    >>> df_test_encoded = one_hot_encoder.transform(df_test)
    >>> df_test_encoded = df_test_encoded.select(["label", "features"])
    >>> df_test_encoded.show()
    +-----+--------------------+
    |label|            features|
    +-----+--------------------+
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,788,4...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,14,45...|
    |    0|(31458,[5,7,2859,...|
    |    0|(31458,[1,7,651,4...|
    +-----+--------------------+
    only showing top 20 rows
    >>> df_test_encoded.cache()
    DataFrame[label: int, features: vector]
    >>> df_test.unpersist()
    DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
    
  12. If you check the Spark UI localhost:4040/jobs/ in your browser, you will see several completed jobs, such as the following:

Figure 6.6: List of jobs completed after encoding

With the encoded training and testing sets ready, we can now train our classification model.

Training and testing a logistic regression model

We will use logistic regression as our example, but there are many other classification models supported in PySpark, such as decision tree classifiers, random forests, neural networks (which we will be studying in Chapter 8, Predicting Stock Prices with Artificial Neural Networks), linear SVM, and Naïve Bayes. For further details, please refer to the following link: https://spark.apache.org/docs/latest/ml-classification-regression.html#classification.

We can train and test a logistic regression model by using the following steps:

  1. We first import the logistic regression module and initialize a model:
    >>> from pyspark.ml.classification import LogisticRegression
    >>> classifier = LogisticRegression(maxIter=20, regParam=0.001, 
                                        elasticNetParam=0.001)
    

    Here, we set the maximum iterations as 20, and the regularization parameter as 0.001.

  2. Now, we fit the model on the encoded training set:
    >>> lr_model = classifier.fit(df_train_encoded)
    

    Be aware that this might take a while. You can check the running or completed jobs in the Spark UI in the meantime. Refer to the following screenshot for some completed jobs:

    Figure 6.7: List of jobs completed after training

    Note that each RDDLossFunction represents an iteration of optimizing the logistic regression classifier.

  3. After all iterations, we apply the trained model on the testing set:
    >>> predictions = lr_model.transform(df_test_encoded)
    
  4. We cache the prediction results, as we will compute the prediction's performance:
    >>> predictions.cache()
    DataFrame[label: int, features: vector, rawPrediction: vector, probability: vector, prediction: double]
    Take a look at the prediction DataFrame:
    >>> predictions.show()
    +-----+--------------------+--------------------+--------------------+----------+
    |label|            features|       rawPrediction|         probability|prediction|
    +-----+--------------------+--------------------+--------------------+----------+
    |    0|(31458,[5,7,788,4...|[2.80267740289335...|[0.94282033454271...|       0.0|
    |    0|(31458,[5,7,788,4...|[2.72243908463177...|[0.93833781006061...|       0.0|
    |    0|(31458,[5,7,788,4...|[2.72243908463177...|[0.93833781006061...|       0.0|
    |    0|(31458,[5,7,788,4...|[2.82083664358057...|[0.94379146612755...|       0.0|
    |    0|(31458,[5,7,788,4...|[2.82083664358057...|[0.94379146612755...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.44920221201642...|[0.98844714081261...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.44920221201642...|[0.98844714081261...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.44920221201642...|[0.98844714081261...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.54759977096521...|[0.98951842852058...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.54759977096521...|[0.98951842852058...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...|       0.0|
    |    0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...|       0.0|
    |    0|(31458,[5,7,14,45...|[5.58870435258071...|[0.99627406423617...|       0.0|
    |    0|(31458,[5,7,14,45...|[5.66066729150822...|[0.99653187592454...|       0.0|
    |    0|(31458,[5,7,14,45...|[5.66066729150822...|[0.99653187592454...|       0.0|
    |    0|(31458,[5,7,14,45...|[5.61336061100621...|[0.99636447866332...|       0.0|
    |    0|(31458,[5,7,2859,...|[5.47553763410082...|[0.99582948965297...|       0.0|
    |    0|(31458,[1,7,651,4...|[1.33424801682849...|[0.79154243844810...|       0.0|
    +-----+--------------------+--------------------+--------------------+----------+
    only showing top 20 rows
    

    This contains the predictive features, the ground truth, the probabilities of the two classes, and the final prediction (with a decision threshold of 0.5).

  5. We evaluate the Area Under Curve (AUC) of the Receiver Operating Characteristics (ROC) on the testing set using the BinaryClassificationEvaluator function with the areaUnderROC evaluation metric:
    >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
    >>> ev = BinaryClassificationEvaluator(rawPredictionCol = 
                      "rawPrediction", metricName = "areaUnderROC")
    >>> print(ev.evaluate(predictions))
    0.7488839207716323
    

We are hereby able to obtain an AUC of 74.89%. Can we do better than this? Let's see in the next section.

Feature engineering on categorical variables with Spark

In this chapter, I have demonstrated how to build an ad click predictor that learns from massive click logs using Spark. Thus far, we have been using one-hot encoding to employ categorical inputs. In this section, we will talk about two popular feature engineering techniques: feature hashing and feature interaction.

Feature hashing is an alternative to one-hot encoding, while feature interaction is a variant of one-hot encoding. Feature engineering means generating new features based on domain knowledge or defined rules, in order to improve the learning performance achieved with the existing feature space.

Hashing categorical features

In machine learning, feature hashing (also called the hashing trick) is an efficient way to encode categorical features. It is based on hashing functions in computer science, which map data of variable sizes to data of a fixed (and usually smaller) size. It is easier to understand feature hashing through an example.

Let's say we have three features: gendersite_domain, and device_model:

gender

site_domain

device_model

male

cnn

samsung

female

abc

iphone

male

nbc

huawei

male

facebook

xiaomi

female

abc

iphone

Table 6.1: Example data of three categorical features

With one-hot encoding, these will become feature vectors of size 9, which comes from 2 (from gender) + 4 (from site_domain) + 3 (from device_model). With feature hashing, we want to obtain a feature vector of size 4. We define a hash function as the sum of Unicode code points for each character, and then divide the result by 4 and take the remainder as the hashed output. Take the first row as an example; we have the following:

ord(m) + ord(a) + ord(l) + ord(e) + … + ord(s) + ord(u) + ord(n) + ord(g) =

109 + 97 + 108 + 101 + … + 115 + 117 + 110 + 103 = 1500

1500 % 4 = 0, which means we can encode this sample into [1 0 0 0]. Similarly, if the remainder is 1, a sample is hashed into [0, 1, 0, 0]; [0, 0, 1, 0] for a sample with 2 as the remainder; [0, 0, 0, 1] for a sample with 3 as the remainder; and so on.

Similarly, for other rows, we have the following:

gender

site_domain

device_model

hash result

male

cnn

samsung

[1 0 0 0]

female

abc

iphone

[0 0 0 1]

male

nbc

huawei

[0 1 0 0]

male

facebook

xiaomi

[1 0 0 0]

female

abc

iphone

[0 0 0 1]

Table 6.2: Hash results of the example data

In the end, we use the four-dimension hashed vectors to represent the original data, instead of the nine-dimension one-hot encoded ones.

There are a few things to note about feature hashing:

  • The same input will always be converted into the same output; for instance, the second and fifth rows.
  • Two different inputs might be converted into the same output, such as the first and fourth rows. This phenomenon is called hashing collision.
  • Hence, the choice of the resulting fixed size is important. It will result in serious collision and information loss if the size is too small. If it is too large, it is basically a redundant one-hot encoding. With the correct size, it will make hashing space-efficient and, at the same time, preserve important information, which will further benefit downstream tasks.
  • Hashing is one-way, which means we cannot revert the output to its input, while one-hot encoding is a two-way mapping.

Let's now adopt feature hashing for our click prediction project. Recall that the one-hot encoded vectors are of size 31,458. If we choose 10,000 as the fixed hashing size, we will be able to cut the space to less than a third, and reduce the memory consumed by training the classification model. Also, we will see how quick it is to perform feature hashing compared to one-hot encoding, as there is no need to keep track of all unique values across all columns.

It just maps each individual row of string values to a sparse vector through internal hash functions, as follows:

  1. We begin by importing the feature hashing module from PySpark and initializing a feature hasher with an output size of 10000:
    >>> from pyspark.ml.feature import FeatureHasher
    >>> hasher = FeatureHasher(numFeatures=10000, 
                    inputCols=categorical, outputCol="features")
    
  2. We use the defined hasher to convert the input DataFrame:
    >>> hasher.transform(df_train).select("features").show()
    +--------------------+
    |            features|
    +--------------------+
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[29,1228,1...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[1228,1289...|
    |(10000,[746,1060,...|
    |(10000,[675,1228,...|
    |(10000,[1289,1695...|
    +--------------------+
    only showing top 20 rows
    

    As you can see, the size of the resulting column, features, is 10000. Again, there is no training or fitting in feature hashing. The hasher is a predefined mapping.

  3. For better organization of the entire workflow, we chain the hasher and classification model together into a pipeline:
    >>> classifier = LogisticRegression(maxIter=20, regParam=0.000, 
                                        elasticNetParam=0.000)
    >>> stages = [hasher, classifier]
    >>> pipeline = Pipeline(stages=stages)
    
  4. We fit the pipelined model on the training set as follows:
    >>> model = pipeline.fit(df_train)
    
  5. We apply the trained model on the testing set and record the prediction results:
    >>> predictions = model.transform(df_test)
    >>> predictions.cache()
    
  6. We evaluate its performance in terms of the AUC of ROC:
    >>> ev = BinaryClassificationEvaluator(rawPredictionCol = 
                       "rawPrediction", metricName = "areaUnderROC")
    >>> print(ev.evaluate(predictions))
    0.7448097180769776
    

We are able to achieve an AUC of 74.48%, which is close to the previous one of 74.89% with one-hot encoding. At the end of the day, we save a substantial amount of computational resources and attain a comparable prediction accuracy. That is a win.

With feature hashing, we lose interpretability but gain a computational advantage.

Combining multiple variables – feature interaction

Among all the features of the click log data, some are very weak signals in themselves. For example, gender itself doesn't tell us much regarding whether someone will click an ad, and the device model itself doesn't provide much information either.

However, by combining multiple features, we will be able to create a stronger synthesized signal. Feature interaction (also known as feature crossing) will be introduced for this purpose. For numerical features, it usually generates new features by multiplying multiples of them.

We can also define whatever integration rules we want. For example, we can generate an additional feature, income/person, from two original features, household income and household size:

household income

household size

income/person

300,000

2

150,000

100,000

1

100,000

400,000

4

100,000

300,000

5

60,000

200,000

2

100,000

Table 6.3: An example of generating a new numerical feature based on existing ones

For categorical features, feature interaction becomes an AND operation on two or more features. In the following example, we are generating an additional feature, gender:site_domain, from two original features, gender and site_domain:

gender

site_domain

gender:site_domain

male

cnn

male:cnn

female

abc

female:abc

male

nbc

male:nbc

male

facebook

male:facebook

female

abc

female:abc

Table 6.4: An example of generating a new categorical feature based on existing ones

We then use one-hot encoding to transform string values. On top of six one-hot encoded features (two from gender and four from site_domain), feature interaction between gender and site_domain adds eight further features (two by four).

Let's now adopt feature interaction for our click prediction project. We will take two features, C14 and C15, as an example of an AND interaction:

  1. First, we import the feature interaction module, RFormula, from PySpark:
    >>> from pyspark.ml.feature import RFormula
    

    An RFormula model takes in a formula that describes how features interact. For instance, y ~ a + b means it takes in features a and b, and predicts y based on them; y ~ a + b + a:b means it predicts y based on features ab, and the iteration term, a AND by ~ a + b + c + a:b means it predicts y based on features abc, and the iteration term, a AND b.

  2. We need to define an interaction formula accordingly:
    >>> cat_inter = ['C14', 'C15']
    >>> cat_no_inter = [c for c in categorical if c not in cat_inter]
    >>> concat = '+'.join(categorical)
    >>> interaction = ':'.join(cat_inter)
    >>> formula = "label ~ " + concat + '+' + interaction
    >>> print(formula)
    label ~ C1+banner_pos+site_id+site_domain+site_category+app_id+app_domain+app_category+device_model+device_type+device_conn_type+C14+C15+C16+C17+C18+C19+C20+C21+C14:C15
    
  3. Now, we can initialize a feature interactor with this formula:
    >>> interactor = RFormula(
    ...     formula=formula,
    ...     featuresCol="features",
    ...     labelCol="label").setHandleInvalid("keep")
    

    Again, the setHandleInvalid("keep") handle here makes sure it won't crash if any new categorical value occurs.

  4. We use the defined feature interactor to fit and transform the input DataFrame:
    >>> interactor.fit(df_train).transform(df_train).select("features").
                                                                   show()
    +--------------------+
    |            features|
    +--------------------+
    |(54930,[5,7,3527,...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,788,4...|
    |(54930,[5,7,1271,...|
    |(54930,[5,7,1271,...|
    |(54930,[5,7,1271,...|
    |(54930,[5,7,1271,...|
    |(54930,[5,7,1532,...|
    |(54930,[5,7,4366,...|
    |(54930,[5,7,14,45...|
    +--------------------+
    only showing top 20 rows
    

    More than 20,000 features are added to the feature space due to the interaction term of C14 and C15.

  5. Again, we chain the feature interactor and classification model together into a pipeline to organize the entire workflow:
    >>> classifier = LogisticRegression(maxIter=20, regParam=0.000, 
                                        elasticNetParam=0.000)
    >>> stages = [interactor, classifier]
    >>> pipeline = Pipeline(stages=stages)
    >>> model = pipeline.fit(df_train)
    >>> predictions = model.transform(df_test)
    >>> predictions.cache()
    >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
    >>> ev = BinaryClassificationEvaluator(rawPredictionCol = 
                         "rawPrediction", metricName = "areaUnderROC")
    >>> print(ev.evaluate(predictions))
    0.7490392990518315
    

An AUC of 74.90%, with additional interaction between features C14 and C15, is a boost from 74.89% without any interaction. Therefore, feature interaction slightly boosts the model's performance.

Summary

In this chapter, we continued working on the online advertising click-through prediction project. This time, we were able to train the classifier on the entire dataset with millions of records, with the help of the parallel computing tool Apache Spark. We discussed the basics of Spark, including its major components, the deployment of Spark programs, the programming essentials of PySpark, and the Python interface of Spark. Then, we programmed using PySpark to explore the click log data.

You learned how to perform one-hot encoding, cache intermediate results, develop classification solutions based on the entire click log dataset, and evaluate performance. In addition, I introduced two feature engineering techniques, feature hashing and feature interaction, in order to improve prediction performance. We had fun implementing them in PySpark as well.

Looking back on our learning journey, we have been working on classification problems since Chapter 2, Building a Movie Recommendation Engine with Naïve Bayes. Actually, we have covered all the powerful and popular classification models in machine learning. We will move on to solving regression problems in the next chapter; regression is the sibling of classification in supervised learning. You will learn about regression models, including linear regression, decision trees for regression, and support vector regression.

Exercises

  1. In the one-hot encoding solution, can you use different classifiers supported in PySpark instead of logistic regression, such as decision trees, random forests, or linear SVM?
  2. In the feature hashing solution, can you try other hash sizes, such as 5,000 or 20,000? What do you observe?
  3. In the feature interaction solution, can you try other interactions, such as C1 and C20?
  4. Can you first use feature interaction and then feature hashing in order to lower the expanded dimension? Are you able to obtain a higher AUC?
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset