Similar to the previous chapter, we need to encode categorical features into sets of multiple binary features by executing the following steps:
- In our case, the categorical features include the following:
>>> categorical = df_train.columns
>>> categorical.remove('label')
>>> print(categorical)
['C1', 'banner_pos', 'site_id', 'site_domain', 'site_category', 'app_id', 'app_domain', 'app_category', 'device_model', 'device_type', 'device_conn_type', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21']
In PySpark, one-hot encoding is not as direct as scikit-learn (specifically, with the OneHotEncoder module).
- We first need to index each categorical column using the StringIndexer module:
>>> from pyspark.ml.feature import StringIndexer
>>> indexers = [
... StringIndexer(inputCol=c, outputCol=
"{0}_indexed".format(c)).setHandleInvalid("keep")
... for c in categorical
... ]
The setHandleInvalid("keep") handle makes sure it won't crash if any new categorical value occurs. Try to omit it and you will see error messages related to unknown values.
- Then, we perform one-hot encoding on each individual indexed categorical column using the OneHotEncoderEstimator module:
>>> from pyspark.ml.feature import OneHotEncoderEstimator
>>> encoder = OneHotEncoderEstimator(
... inputCols=[indexer.getOutputCol() for indexer in indexers],
... outputCols=["{0}_encoded".format(indexer.getOutputCol())
for indexer in indexers]
... )
- Next, we concatenate all sets of generated binary vectors into a single one using the VectorAssembler module:
>>> from pyspark.ml.feature import VectorAssembler
>>> assembler = VectorAssembler(
... inputCols=encoder.getOutputCols(),
... outputCol="features"
... )
This creates the final encoded vector column called features.
- We chain all these three stages together into a pipeline with the Pipeline module in PySpark, which better organizes our one-hot encoding workflow:
>>> stages = indexers + [encoder, assembler]
>>> from pyspark.ml import Pipeline
>>> pipeline = Pipeline(stages=stages)
- Finally, we can fit the pipeline one-hot encoding model over the training set:
>>> one_hot_encoder = pipeline.fit(df_train)
- Once this is done, we use the trained encoder to transform both the training and testing sets. For the training set, we use the following code:
>>> df_train_encoded = one_hot_encoder.transform(df_train)
>>> df_train_encoded.show()
At this point, we skip displaying the results as there are dozens of columns with several additional ones added on top of df_train.
- However, we can see the one we are looking for, the features column, which contains the one-hot encoded results. Hence, we only select this column along with the target variable:
>>> df_train_encoded = df_train_encoded.select(
["label", "features"])
>>> df_train_encoded.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(31458,[5,7,3527,...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1271,...|
| 0|(31458,[5,7,1532,...|
| 0|(31458,[5,7,4366,...|
| 0|(31458,[5,7,14,45...|
+-----+--------------------+
only showing top 20 rows
The feature column contains sparse vectors of size 31,458.
- Don't forget to cache df_train_encoded, as we will be using it to iteratively train our classification model:
>>> df_train_encoded.cache()
DataFrame[label: int, features: vector]
- To release some space, we uncache df_train, since we will no longer need it:
>>> df_train.unpersist()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
- Now, we will repeat the preceding steps for the testing set:
>>> df_test_encoded = one_hot_encoder.transform(df_test)
>>> df_test_encoded = df_test_encoded.select(["label", "features"])
>>> df_test_encoded.show()
+-----+--------------------+
|label| features|
+-----+--------------------+
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,788,4...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,14,45...|
| 0|(31458,[5,7,2859,...|
| 0|(31458,[1,7,651,4...|
+-----+--------------------+
only showing top 20 rows
>>> df_test_encoded.cache()
DataFrame[label: int, features: vector]
>>> df_test.unpersist()
DataFrame[label: int, C1: string, banner_pos: string, site_id: string, site_domain: string, site_category: string, app_id: string, app_domain: string, app_category: string, device_model: string, device_type: string, device_conn_type: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string]
- If you check the Spark UI localhost:4040/jobs/ in your browser, you will see several completed jobs, such as the following: