In the previous chapter, we developed an ad click-through predictor using a logistic regression classifier. We proved that the algorithm is highly scalable by training efficiently on up to 1 million click log samples. In this chapter, we will further boost the scalability of the ad click-through predictor by utilizing a powerful parallel computing (or, more specifically, distributed computing) tool called Apache Spark.
This chapter will demystify how Apache Spark is used to scale up learning on massive data, as opposed to limiting model learning to one single machine. We will also use PySpark
, which is the Python API, to explore click log data, to develop classification solutions based on the entire click log dataset, and to evaluate performance, all in a distributed manner. Aside from this, I will introduce two approaches to playing around with categorical features: one is related to hashing in computer science, while the other fuses multiple features. They will be implemented in Spark as well.
In this chapter, we will cover the following topics:
Apache Spark is a distributed cluster computing framework designed for fast and general-purpose computation. It is an open-source technology originally developed by Berkeley's AMPLab at the University of California. It provides an easy-to-use interface for programming interactive queries and stream processing data. What makes it a popular big data analytics tool is its implicit data parallelism, where it automates operations on data in parallel across processors in the computing cluster. Users only need to focus on how they want to manipulate the data, without worrying about how it is distributed among all the computing nodes or which part of the data a node is responsible for.
Bear in mind that this book is mainly about machine learning. Hence, we will only briefly cover the fundamentals of Spark, including its components, installation, deployment, data structure, and core programming.
We will start with the main components of Spark, which are depicted in the following diagram:
Figure 6.1: The main components of Spark
Let's discuss them in more detail:
The main goal of this section is to understand Spark as a distributed cluster computing framework designed for fast computation, which facilitates both data analytics and iterative learning. If you are looking for more detailed information on Spark, there is a lot of useful documentation and tutorials available online, such as https://spark.apache.org/docs/latest/quick-start.html.
For learning purposes, let's now install Spark on the local computer (even though it is more frequently used in a cluster of servers). Full instructions can be found at https://spark.apache.org/downloads.html. There are several versions, and we will take version 2.4.5 (Feb 05, 2020) with pre-built for Apache Hadoop 2.7 as an example.
At the time of writing, the latest stable version is 2.4.5. Although there is a preview version, 3.0.0, I think the latest stable version is enough to start with. You won't notice much difference between 3.0 and 2.4.5 going through this chapter. Please note that the module pyspark.ml.feature.OneHotEncoderEstimator
has been deprecated, and removed in the preview versions (v 3.0.0 and above). It's functionality has been clubbed with pyspark.ml.feature.OneHotEncoder
.
As illustrated in the following screenshot, after selecting 2.4.5 in step 1, we choose the Pre-built for Apache Hadoop 2.7 option in step 2. Then, we click the link in step 3 to download the spark-2.4.5-bin-hadoop2.7.tgz file:
Figure 6.2: Steps to download Spark
Unzip the downloaded file. The resulting folder contains a complete Spark package; you don't need to do any extra installation.
Before running any Spark program, we need to make sure the following dependencies are installed:
To check whether Spark is installed properly, we run the following tests:
bin
is a folder in spark-2.4.5-bin-hadoop2.7
, so remember to run the following commands inside this folder):
./bin/run-example SparkPi 10
Pi is roughly 3.141851141851142
This test is actually similar to the following:
./bin/spark-submit examples/src/main/python/pi.py 10
./bin/pyspark --master local[2]
This should open a Python interpreter, as shown in the following screenshot:
Figure 6.3: Running Spark in the shell
By now, the Spark program should be installed properly. We will talk about the commands (pyspark
and spark-submit
) in the following sections.
A Spark program can run by itself or over cluster managers. The first option is similar to running a program locally with multiple threads, and one thread is considered one Spark job worker. Of course, there is no parallelism at all, but it is a quick and easy way to launch a Spark application, and we will be deploying it in this mode by way of demonstration throughout this chapter. For example, we can run the following script to launch a Spark application:
./bin/spark-submit examples/src/main/python/pi.py
This is precisely what we did in the previous section. Or, we can specify the number of threads:
./bin/spark-submit --master local[4] examples/src/main/python/pi.py
In the previous code, we run Spark locally with four worker threads, or as many cores as there are on the machine, by using the following command:
./bin/spark-submit --master local[*] examples/src/main/python/pi.py
Similarly, we can launch the interactive shell by replacing spark-submit
with pyspark
:
./bin/pyspark --master local[2] examples/src/main/python/pi.py
As for the cluster mode, it (version 2.4.5) currently supports the following approaches:
It's easy to launch and deploy a Spark application. How about coding in PySpark? Let's see in the next section.
This section provides a quick introduction to programming with Python in Spark. We will start with the basic data structures in Spark.
Resilient Distributed Datasets (RDD) is the primary data structure in Spark. It is a distributed collection of objects and has the following three main features:
RDD was the main data structure in Spark before version 2.0. After that, it was replaced by the DataFrame, which is also a distributed collection of data but organized into named columns. DataFrames utilize the optimized execution engine of Spark SQL. Therefore, they are conceptually similar to a table in a relational database or a DataFrame
object in the Python pandas
library.
Although the current version of Spark still supports RDD, programming with DataFrames is highly recommended. Hence, we won't spend too much time on programming with RDD. Refer to https://spark.apache.org/docs/latest/rdd-programming-guide.html if you are interested.
The entry point to a Spark program is creating a Spark session, which can be done by using the following lines:
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession
... .builder
... .appName("test")
... .getOrCreate()
Note that this is not needed if we run it in a PySpark shell. Right after we spin up a PySpark shell (with ./bin/pyspark
), a Spark session is automatically created. We can check the running Spark application at the following link: localhost:4040/jobs/
. Refer to the following screenshot for the resulting page:
Figure 6.4: Spark application UI
With a Spark session, spark
, a DataFrame object can be created by reading a file (which is usually the case) or manual input. In the following example, we will create a DataFrame object, df
, from a CSV file:
>>> df = spark.read.csv("examples/src/main/resources/people.csv",
header=True, sep=';')
Columns in the CSV file people.csv
are separated by ;
.
Once this is done, we will see a completed job in localhost:4040/jobs/
:
Figure 6.5: Completed job list in the Spark application
We can display the contents of the df
object by using the following command:
>>> df.show()
+-----+---+---------+
| name|age| job|
+-----+---+---------+
|Jorge| 30|Developer|
| Bob| 32|Developer|
+-----+---+---------+
We can count the number of rows by using the following command:
>>> df.count()
2
The schema of the df
object can be displayed using the following command:
>>> df.printSchema()
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- job: string (nullable = true)
One or more columns can be selected as follows:
>>> df.select("name").show()
+-----+
| name|
+-----+
|Jorge|
| Bob|
+-----+
>>> df.select(["name", "job"]).show()
+-----+---------+
| name| job|
+-----+---------+
|Jorge|Developer|
| Bob|Developer|
+-----+---------+
We can filter rows by condition, for instance, by the value of one column, using the following command:
>>> df.filter(df['age'] > 31).show()
+----+---+---------+
|name|age| job|
+----+---+---------+
| Bob| 32|Developer|
+----+---+---------+
We will continue programming in PySpark in the next section, where we will use Spark to solve the ad click-through problem.
Normally, in order to take advantage of Spark, data is stored using Hadoop Distributed File System (HDFS), which is a distributed file system designed to store large volumes of data, and computation occurs over multiple nodes on clusters. For demonstration purposes, we will keep the data on a local machine and run Spark locally. This is no different from running it on a distributed computing cluster.
To train a model on massive click logs, we first need to load the data in Spark. We do so by taking the following steps:
./bin/pyspark --master local[*] --driver-memory 20G
Here, we specify a large driver memory as we are dealing with a dataset of more than 6 GB.
A driver program is responsible for collecting and storing processed results from executors. So, a large driver memory helps complete jobs where lots of data is processed.
CTR
:
>>> spark = SparkSession
... .builder
... .appName("CTR")
... .getOrCreate()
train
file into a DataFrame object. Note that the data load function spark.read.csv
allows custom schemas, which guarantees data is loaded as expected, as opposed to automatically inferring schemas. So, first, we define the schema:
>>> from pyspark.sql.types import StructField, StringType,
StructType, IntegerType
>>> schema = StructType([
... StructField("id", StringType(), True),
... StructField("click", IntegerType(), True),
... StructField("hour", IntegerType(), True),
... StructField("C1", StringType(), True),
... StructField("banner_pos", StringType(), True),
... StructField("site_id", StringType(), True),
... StructField("site_domain", StringType(), True),
... StructField("site_category", StringType(), True),
... StructField("app_id", StringType(), True),
... StructField("app_domain", StringType(), True),
... StructField("app_category", StringType(), True),
... StructField("device_id", StringType(), True),
... StructField("device_ip", StringType(), True),
... StructField("device_model", StringType(), True),
... StructField("device_type", StringType(), True),
... StructField("device_conn_type", StringType(), True),
... StructField("C14", StringType(), True),
... StructField("C15", StringType(), True),
... StructField("C16", StringType(), True),
... StructField("C17", StringType(), True),
... StructField("C18", StringType(), True),
... StructField("C19", StringType(), True),
... StructField("C20", StringType(), True),
... StructField("C21", StringType(), True),
... ])
Each field of the schema contains the name of the column (such as id
, click
, or hour
), the data type (such as integer
or string
), and whether missing values are allowed (allowed, in this case).
df
:
>>> df = spark.read.csv("file://path_to_file/train", schema=schema,
header=True)
Remember to replace path_to_file
with the absolute path of where the train
data file is located. The file://
prefix indicates that data is read from a local file. Another prefix, dbfs://
, is used for data stored in HDFS.
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- click: integer (nullable = true)
|-- hour: integer (nullable = true)
|-- C1: string (nullable = true)
|-- banner_pos: string (nullable = true)
|-- site_id: string (nullable = true)
|-- site_domain: string (nullable = true)
|-- site_category: string (nullable = true)
|-- app_id: string (nullable = true)
|-- app_domain: string (nullable = true)
|-- app_category: string (nullable = true)
|-- device_id: string (nullable = true)
|-- device_ip: string (nullable = true)
|-- device_model: string (nullable = true)
|-- device_type: string (nullable = true)
|-- device_conn_type: string (nullable = true)
|-- C14: string (nullable = true)
|-- C15: string (nullable = true)
|-- C16: string (nullable = true)
|-- C17: string (nullable = true)
|-- C18: string (nullable = true)
|-- C19: string (nullable = true)
|-- C20: string (nullable = true)
|-- C21: string (nullable = true)
>>> df.count()
40428967
>>> df =
df.drop('id').drop('hour').drop('device_id').drop('device_ip')
click
to label
, as this will be consumed more often in the downstream operations:
>>> df = df.withColumnRenamed("click", "label")
>>> df.columns
['label', 'C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']
After inspecting the input data, we need to split and cache the data.
Here, we split the data into a training set and testing set, as follows:
>>> df_train, df_test = df.randomSplit([0.7, 0.3], 42)
In this case, 70% of the samples are used for training and the remaining samples are used for testing, with a random seed specified, as always, for reproduction.
Before we perform any heavy lifting (such as model learning) on the training set, df_train
, it is good practice to cache the object. In Spark, caching and persistence are optimization techniques that reduce the computation overhead. This saves the intermediate results of RDD or DataFrame operations in memory and/or on disk. Without caching or persistence, whenever an intermediate DataFrame is needed, it will be recalculated again according to how it was created originally. Depending on the storage level, persistence behaves differently:
MEMORY_ONLY
: The object is only stored in memory. If it does not fit in memory, the remaining part will be recomputed each time it is needed.DISK_ONLY
: The object is only kept on disk. A persisted object can be extracted directly from storage without being recalculated.MEMORY_AND_DISK
: The object is stored in memory, and might be on disk as well. If the full object does not fit in memory, the remaining partition will be stored on disk, instead of being recalculated every time it is needed. This is the default mode for caching and persistence in Spark. It takes advantage of both the fast retrieval of in-memory storage and the high accessibility and capacity of disk storage.In PySpark, caching is simple. All that is required is a cache method.
Let's cache both the training and testing DataFrames:
>>> df_train.cache()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
>>> df_train.count()
28297027
>>> df_test.cache()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
>>> df_test.count()
12131940
Now, we have the training and testing data ready for downstream analysis.
Similar to the previous chapter, we need to encode categorical features into sets of multiple binary features by executing the following steps:
>>> categorical = df_train.columns
>>> categorical.remove('label')
>>> print(categorical)
['C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']
In PySpark, one-hot encoding is not as direct as it is in scikit-learn (specifically, with the OneHotEncoder
module).
StringIndexer
module:
>>> from pyspark.ml.feature import StringIndexer
>>> indexers = [
... StringIndexer(inputCol=c, outputCol=
"{0}_indexed".format(c)).setHandleInvalid("keep")
... for c in categorical
... ]
The setHandleInvalid("keep")
handle makes sure the application won't crash if any new categorical value occurs. Try to omit it; you will see error messages related to unknown values.
OneHotEncoderEstimator
module:
>>> from pyspark.ml.feature import OneHotEncoderEstimator
>>> encoder = OneHotEncoderEstimator(
... inputCols=[indexer.getOutputCol() for indexer in indexers],
... outputCols=["{0}_encoded".format(indexer.getOutputCol())
for indexer in indexers]
... )
VectorAssembler
module:
>>> from pyspark.ml.feature import VectorAssembler
>>> assembler = VectorAssembler(
... inputCols=encoder.getOutputCols(),
... outputCol="features"
... )
This creates the final encoded vector column called features.
Pipeline
module in PySpark, which better organizes our one-hot encoding workflow:
>>> stages = indexers + [encoder, assembler]
>>> from pyspark.ml import Pipeline
>>> pipeline = Pipeline(stages=stages)
The variable stages
is a list of operations needed for encoding.
pipeline
one-hot encoding model over the training set:
>>> one_hot_encoder = pipeline.fit(df_train)
>>> df_train_encoded = one_hot_encoder.transform(df_train)
>>> df_train_encoded.show()
df_train
. However, we can see the one we are looking for, the features
column, which contains the one-hot encoded results. Hence, we only select this column, along with the target variable:
>>> df_train_encoded = df_train_encoded.select(
["label", "features"])
>>> df_train_encoded.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(31458,[5,7,3527,...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1532,...|
| 0|(31458,[5,7,4366,...|
| 0|(31458,[5,7,14,45...|
+-----+--------------------+
only showing top 20 rows
df_train_encoded
, as we will be using it to iteratively train our classification model:
>>> df_train_encoded.cache()
DataFrame[label: int, features: vector]
df_train
, since we will no longer need it:
>>> df_train.unpersist()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
>>> df_test_encoded = one_hot_encoder.transform(df_test)
>>> df_test_encoded = df_test_encoded.select(["label", "features"])
>>> df_test_encoded.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,2859,...|
| 0|(31458,[1,7,651,4...|
+-----+--------------------+
only showing top 20 rows
>>> df_test_encoded.cache()
DataFrame[label: int, features: vector]
>>> df_test.unpersist()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
localhost:4040/jobs/
in your browser, you will see several completed jobs, such as the following:
Figure 6.6: List of jobs completed after encoding
With the encoded training and testing sets ready, we can now train our classification model.
We will use logistic regression as our example, but there are many other classification models supported in PySpark, such as decision tree classifiers, random forests, neural networks (which we will be studying in Chapter 8, Predicting Stock Prices with Artificial Neural Networks), linear SVM, and Naïve Bayes. For further details, please refer to the following link: https://spark.apache.org/docs/latest/ml-classification-regression.html#classification.
We can train and test a logistic regression model by using the following steps:
>>> from pyspark.ml.classification import LogisticRegression
>>> classifier = LogisticRegression(maxIter=20, regParam=0.001,
elasticNetParam=0.001)
Here, we set the maximum iterations as 20
, and the regularization parameter as 0.001
.
>>> lr_model = classifier.fit(df_train_encoded)
Be aware that this might take a while. You can check the running or completed jobs in the Spark UI in the meantime. Refer to the following screenshot for some completed jobs:
Figure 6.7: List of jobs completed after training
Note that each RDDLossFunction represents an iteration of optimizing the logistic regression classifier.
>>> predictions = lr_model.transform(df_test_encoded)
>>> predictions.cache()
DataFrame[label: int, features: vector, rawPrediction: vector, probability: vector, prediction: double]
Take a look at the prediction DataFrame:
>>> predictions.show()
+-----+--------------------+--------------------+--------------------+----------+
|label| features| rawPrediction| probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
| 0|(31458,[5,7,788,4...|[2.80267740289335...|[0.94282033454271...| 0.0|
| 0|(31458,[5,7,788,4...|[2.72243908463177...|[0.93833781006061...| 0.0|
| 0|(31458,[5,7,788,4...|[2.72243908463177...|[0.93833781006061...| 0.0|
| 0|(31458,[5,7,788,4...|[2.82083664358057...|[0.94379146612755...| 0.0|
| 0|(31458,[5,7,788,4...|[2.82083664358057...|[0.94379146612755...| 0.0|
| 0|(31458,[5,7,14,45...|[4.44920221201642...|[0.98844714081261...| 0.0|
| 0|(31458,[5,7,14,45...|[4.44920221201642...|[0.98844714081261...| 0.0|
| 0|(31458,[5,7,14,45...|[4.44920221201642...|[0.98844714081261...| 0.0|
| 0|(31458,[5,7,14,45...|[4.54759977096521...|[0.98951842852058...| 0.0|
| 0|(31458,[5,7,14,45...|[4.54759977096521...|[0.98951842852058...| 0.0|
| 0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...| 0.0|
| 0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...| 0.0|
| 0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...| 0.0|
| 0|(31458,[5,7,14,45...|[4.38991492595212...|[0.98775013592573...| 0.0|
| 0|(31458,[5,7,14,45...|[5.58870435258071...|[0.99627406423617...| 0.0|
| 0|(31458,[5,7,14,45...|[5.66066729150822...|[0.99653187592454...| 0.0|
| 0|(31458,[5,7,14,45...|[5.66066729150822...|[0.99653187592454...| 0.0|
| 0|(31458,[5,7,14,45...|[5.61336061100621...|[0.99636447866332...| 0.0|
| 0|(31458,[5,7,2859,...|[5.47553763410082...|[0.99582948965297...| 0.0|
| 0|(31458,[1,7,651,4...|[1.33424801682849...|[0.79154243844810...| 0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
This contains the predictive features, the ground truth, the probabilities of the two classes, and the final prediction (with a decision threshold of 0.5).
BinaryClassificationEvaluator
function with the areaUnderROC
evaluation metric:
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> ev = BinaryClassificationEvaluator(rawPredictionCol =
"rawPrediction", metricName = "areaUnderROC")
>>> print(ev.evaluate(predictions))
0.7488839207716323
We are hereby able to obtain an AUC of 74.89%. Can we do better than this? Let's see in the next section.
In this chapter, I have demonstrated how to build an ad click predictor that learns from massive click logs using Spark. Thus far, we have been using one-hot encoding to employ categorical inputs. In this section, we will talk about two popular feature engineering techniques: feature hashing and feature interaction.
Feature hashing is an alternative to one-hot encoding, while feature interaction is a variant of one-hot encoding. Feature engineering means generating new features based on domain knowledge or defined rules, in order to improve the learning performance achieved with the existing feature space.
In machine learning, feature hashing (also called the hashing trick) is an efficient way to encode categorical features. It is based on hashing functions in computer science, which map data of variable sizes to data of a fixed (and usually smaller) size. It is easier to understand feature hashing through an example.
Let's say we have three features: gender, site_domain, and device_model:
gender |
site_domain |
device_model |
male |
cnn |
samsung |
female |
abc |
iphone |
male |
nbc |
huawei |
male |
|
xiaomi |
female |
abc |
iphone |
Table 6.1: Example data of three categorical features
With one-hot encoding, these will become feature vectors of size 9, which comes from 2 (from gender) + 4 (from site_domain) + 3 (from device_model). With feature hashing, we want to obtain a feature vector of size 4. We define a hash function as the sum of Unicode code points for each character, and then divide the result by 4 and take the remainder as the hashed output. Take the first row as an example; we have the following:
ord(m) + ord(a) + ord(l) + ord(e) + … + ord(s) + ord(u) + ord(n) + ord(g) =
109 + 97 + 108 + 101 + … + 115 + 117 + 110 + 103 = 1500
1500 % 4 = 0, which means we can encode this sample into [1 0 0 0]. Similarly, if the remainder is 1, a sample is hashed into [0, 1, 0, 0]; [0, 0, 1, 0] for a sample with 2 as the remainder; [0, 0, 0, 1] for a sample with 3 as the remainder; and so on.
Similarly, for other rows, we have the following:
gender |
site_domain |
device_model |
hash result |
male |
cnn |
samsung |
[1 0 0 0] |
female |
abc |
iphone |
[0 0 0 1] |
male |
nbc |
huawei |
[0 1 0 0] |
male |
|
xiaomi |
[1 0 0 0] |
female |
abc |
iphone |
[0 0 0 1] |
Table 6.2: Hash results of the example data
In the end, we use the four-dimension hashed vectors to represent the original data, instead of the nine-dimension one-hot encoded ones.
There are a few things to note about feature hashing:
Let's now adopt feature hashing for our click prediction project. Recall that the one-hot encoded vectors are of size 31,458. If we choose 10,000 as the fixed hashing size, we will be able to cut the space to less than a third, and reduce the memory consumed by training the classification model. Also, we will see how quick it is to perform feature hashing compared to one-hot encoding, as there is no need to keep track of all unique values across all columns.
It just maps each individual row of string values to a sparse vector through internal hash functions, as follows:
10000
:
>>> from pyspark.ml.feature import FeatureHasher
>>> hasher = FeatureHasher(numFeatures=10000,
inputCols=categorical, outputCol="features")
>>> hasher.transform(df_train).select("features").show()
+--------------------+
| features|
+--------------------+
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[29,1228,1...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[1228,1289...|
|(10000,[746,1060,...|
|(10000,[675,1228,...|
|(10000,[1289,1695...|
+--------------------+
only showing top 20 rows
As you can see, the size of the resulting column, features
, is 10000
. Again, there is no training or fitting in feature hashing. The hasher is a predefined mapping.
>>> classifier = LogisticRegression(maxIter=20, regParam=0.000,
elasticNetParam=0.000)
>>> stages = [hasher, classifier]
>>> pipeline = Pipeline(stages=stages)
>>> model = pipeline.fit(df_train)
>>> predictions = model.transform(df_test)
>>> predictions.cache()
>>> ev = BinaryClassificationEvaluator(rawPredictionCol =
"rawPrediction", metricName = "areaUnderROC")
>>> print(ev.evaluate(predictions))
0.7448097180769776
We are able to achieve an AUC of 74.48%, which is close to the previous one of 74.89% with one-hot encoding. At the end of the day, we save a substantial amount of computational resources and attain a comparable prediction accuracy. That is a win.
With feature hashing, we lose interpretability but gain a computational advantage.
Among all the features of the click log data, some are very weak signals in themselves. For example, gender itself doesn't tell us much regarding whether someone will click an ad, and the device model itself doesn't provide much information either.
However, by combining multiple features, we will be able to create a stronger synthesized signal. Feature interaction (also known as feature crossing) will be introduced for this purpose. For numerical features, it usually generates new features by multiplying multiples of them.
We can also define whatever integration rules we want. For example, we can generate an additional feature, income/person, from two original features, household income and household size:
household income |
household size |
income/person |
300,000 |
2 |
150,000 |
100,000 |
1 |
100,000 |
400,000 |
4 |
100,000 |
300,000 |
5 |
60,000 |
200,000 |
2 |
100,000 |
Table 6.3: An example of generating a new numerical feature based on existing ones
For categorical features, feature interaction becomes an AND operation on two or more features. In the following example, we are generating an additional feature, gender:site_domain, from two original features, gender and site_domain:
gender |
site_domain |
gender:site_domain |
male |
cnn |
male:cnn |
female |
abc |
female:abc |
male |
nbc |
male:nbc |
male |
|
male:facebook |
female |
abc |
female:abc |
Table 6.4: An example of generating a new categorical feature based on existing ones
We then use one-hot encoding to transform string values. On top of six one-hot encoded features (two from gender and four from site_domain), feature interaction between gender and site_domain adds eight further features (two by four).
Let's now adopt feature interaction for our click prediction project. We will take two features, C14
and C15
, as an example of an AND interaction:
RFormula
, from PySpark:
>>> from pyspark.ml.feature import RFormula
An RFormula
model takes in a formula that describes how features interact. For instance, y ~ a + b means it takes in features a and b, and predicts y based on them; y ~ a + b + a:b means it predicts y based on features a, b, and the iteration term, a AND b; y ~ a + b + c + a:b means it predicts y based on features a, b, c, and the iteration term, a AND b.
>>> cat_inter = ['C14', 'C15']
>>> cat_no_inter = [c for c in categorical if c not in cat_inter]
>>> concat = '+'.join(categorical)
>>> interaction = ':'.join(cat_inter)
>>> formula = "label ~ " + concat + '+' + interaction
>>> print(formula)
label ~ C1+banner_pos+site_id+site_domain+site_category+app_id+app_domain+app_category+device_model+device_type+device_conn_type+C14+C15+C16+C17+C18+C19+C20+C21+C14:C15
>>> interactor = RFormula(
... formula=formula,
... featuresCol="features",
... labelCol="label").setHandleInvalid("keep")
Again, the setHandleInvalid("keep")
handle here makes sure it won't crash if any new categorical value occurs.
>>> interactor.fit(df_train).transform(df_train).select("features").
show()
+--------------------+
| features|
+--------------------+
|(54930,[5,7,3527,...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,788,4...|
|(54930,[5,7,1271,...|
|(54930,[5,7,1271,...|
|(54930,[5,7,1271,...|
|(54930,[5,7,1271,...|
|(54930,[5,7,1532,...|
|(54930,[5,7,4366,...|
|(54930,[5,7,14,45...|
+--------------------+
only showing top 20 rows
More than 20,000 features are added to the feature space due to the interaction term of C14
and C15
.
>>> classifier = LogisticRegression(maxIter=20, regParam=0.000,
elasticNetParam=0.000)
>>> stages = [interactor, classifier]
>>> pipeline = Pipeline(stages=stages)
>>> model = pipeline.fit(df_train)
>>> predictions = model.transform(df_test)
>>> predictions.cache()
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
>>> ev = BinaryClassificationEvaluator(rawPredictionCol =
"rawPrediction", metricName = "areaUnderROC")
>>> print(ev.evaluate(predictions))
0.7490392990518315
An AUC of 74.90%, with additional interaction between features C14
and C15
, is a boost from 74.89% without any interaction. Therefore, feature interaction slightly boosts the model's performance.
In this chapter, we continued working on the online advertising click-through prediction project. This time, we were able to train the classifier on the entire dataset with millions of records, with the help of the parallel computing tool Apache Spark. We discussed the basics of Spark, including its major components, the deployment of Spark programs, the programming essentials of PySpark, and the Python interface of Spark. Then, we programmed using PySpark to explore the click log data.
You learned how to perform one-hot encoding, cache intermediate results, develop classification solutions based on the entire click log dataset, and evaluate performance. In addition, I introduced two feature engineering techniques, feature hashing and feature interaction, in order to improve prediction performance. We had fun implementing them in PySpark as well.
Looking back on our learning journey, we have been working on classification problems since Chapter 2, Building a Movie Recommendation Engine with Naïve Bayes. Actually, we have covered all the powerful and popular classification models in machine learning. We will move on to solving regression problems in the next chapter; regression is the sibling of classification in supervised learning. You will learn about regression models, including linear regression, decision trees for regression, and support vector regression.
C1
and C20
?