Scaling out with PySpark – predicting year of song release

To close, let us look at another example using PySpark. With this dataset, which is a subset of the Million Song dataset (Bertin-Mahieux, Thierry, et al. "The million song dataset." ISMIR 2011: Proceedings of the 12th International Society for Music Information Retrieval Conference, October 24-28, 2011, Miami, Florida. University of Miami, 2011), the goal is to predict the year of a song's release based on the features of the track. The data is supplied as a comma-separated text file, which we can convert into an RDD using the Spark textFile() function. As before in our clustering example, we also define a parsing function with a try…catch block so that we do not fail on a single error in a large dataset:

>>> def parse_line(l):
…      try:
…            return l.split(",")
…    except:
…         print("error in processing {0}".format(l))

We then use this function to map each line to the parsed format, which splits the comma delimited text into individual fields and converts these rows into a Spark DataFrame:

>>> songs = sc.textFile('/Users/jbabcock/Downloads/YearPredictionMSD.txt').
map(lambda x : parse_line(x)).
toDF()

Since we convert the resulting RDD into a DataFrame, so that we can access its elements like a list or vector in Python. Next, we want to turn this into a LabeledPoint RDD, just as we did with the Streaming K-Means example in the previous chapter:

>>> from pyspark.mllib.regression import LabeledPoint
>>> songs_labeled = songs.map( lambda x: LabeledPoint(x[0],x[1:]) )

As part of the documentation for this dataset, we assume that the training data (excluding tracks from artists appearing in the test set) is contained in the first 463,715 rows, while the rest is the test data. To split it, we can use the zipWithIndex function, which assigns an index to each element in a partition, and across partitions:

>>> songs_train = songs_labeled.zipWithIndex().
filter( lambda x: x[1] < 463715).
map( lambda x: x[0] )
>>> songs_test = songs_labeled.zipWithIndex().
filter( lambda x: x[1] >= 463715).
map( lambda x: x[0] )

Finally, we can train a random forest model on this data using the following commands:

>>> from pyspark.mllib.tree import RandomForest
>>> rf = RandomForest.trainRegressor(songs_train,{},50,"auto","variance",10,32)
>>> prediction = rf.predict(songs_test.map(lambda x: x.features))
>>> predictedObserved = songs_test.map(lambda lp: lp.label).zip(prediction)

To evaluate the accuracy of the resulting model, we can use the RegressionMetrics module:

>>> from pyspark.mllib.evaluation import RegressionMetrics
>>> RegressionMetrics(predictedObserved).r2

The distributed nature of PySpark means that this analysis will run on both the single example file on your computer, and on a much larger dataset (such as the full million songs), all using the same code. If we wanted to save the random forest model (for example, if we want to store a particular day's model for future reference in a database, or distribute this model across multiple machines where it will be loaded from a serialized format), we can use to the toString() function, which can be potentially compressed using gzip.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset