Streaming clustering in Spark

Up to this point, we have mainly demonstrated examples for ad hoc exploratory analysis. In building up analytical applications, we need to begin putting these into a more robust framework. As an example, we will demonstrate the use of a streaming clustering pipeline using PySpark. This application will potentially scale to very large datasets, and we will compose the pieces of the analysis in such a way that it is robust to failure in the case of malformed data.

As we will be using similar examples with PySpark in the following chapters, let's review the key ingredients we need in such application, some of which we already saw in Chapter 2, Exploratory Data Analysis and Visualization in Python. Most PySpark jobs we will create in this book consist of the following steps:

  1. Construct a Spark context. The context contains information about the name of the application, and parameters such as memory and number of tasks.
  2. The Spark context may be used to construct secondary context objects, such as the streaming context we will use in this example. This context object contains parameters specifically about a particular kind of task, such as a streaming dataset, and inherits all the information we have previously initialized in the base Spark context.
  3. Construct a dataset, which is represented in Spark as a Resilient Distributed Dataset (RDD). While from a programmatic standpoint we can operate on this RDD just as we do with, for example, a pandas dataframe, under the hood it is potentially parallelized across many machines during analysis. We may parallelize the data after reading it from a source file, or reading from parallel file systems such as Hadoop. Ideally we don't want to fail our whole job if one line of data is erroneous, so we would like to place an error handling mechanism here that will alert us to a failure to parse a row without blocking the whole job.
  4. We frequently need to transform our input dataset into a subclass of RDD known as a Labeled RDD. Labeled RDDs contain a label (such as the cluster label for the clustering algorithms we have been studying in this chapter) and a set of features. For our clustering problems, we will only perform this transformation when we predict (as we usually don't know the cluster ahead of time), but for the regression and classification models we will look at in Chapter 4, Connecting the Dots with Models – Regression Methods, and Chapter 5, Putting Data in its Place – Classification Methods and Analysis, the label is used as a part of fitting the model.
  5. We'll frequently want a way to save the output of our modeling to be used by downstream applications, either on disk or in a database, where we can later query models indexed by history.

Let's look at some of these components using the Python notebook. Assuming we have Spark installed on our system, we'll start by importing the required dependencies:

>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext

We can then test starting the SparkContext:

>>> sc = SparkContext( 'local', 'streaming-kmeans')

Recall that the first argument gives the URL for our Spark master, the machine that coordinates execution of Spark jobs and distributes tasks to the worker machines in a cluster. In this case, we will run it locally, so give this argument as localhost, but otherwise this could be the URL of a remote machine in our cluster. The second argument is just the name we give to our application. With a context running, we can also generate the streaming context, which contains information about our streaming application, using the following:

>>> ssc = StreamingContext(sc, 10)

The first argument is simply the SparkContext used as a parent of the StreamingContext: the second is the frequency in seconds at which we will check our streaming data source for new data. If we expect regularly arriving data we could make this lower, or make it higher if we expect new data to be available less frequently.

Now that we have a StreamingContext, we can add data sources. Let's assume for now we'll have two sources for training data (which could be historical). We want the job not to die if we give one line of bad data, and so we use a Parser class that gives this flexibility:

>>> class Parser():
    def __init__(self,type='train',delimiter=',',num_elements=5, job_uuid=''):
        self.type=type
        self.delimiter=delimiter
        self.num_elements=num_elements
        self.job_uuid=job_uuid
        
    def parse(self,l):
        try:
            line = l.split(self.delimiter)    
            if self.type=='train':
                category = float(line[0])
                feature_vector = Vectors.dense(line[1:])
                return LabeledPoint(category, feature_vector)
            elif self.type=='test':
                category = -1
                feature_vector = Vectors.dense(line)
                return LabeledPoint(category, feature_vector)
            else:
                # log exceptions
                f = open('/errors_events/{0}.txt'.format(self.job_uuid),'a')
                f.write('Unknown type: {0}'.format(self.type))
                f.close()
        except:
            # log errors
            f = open('/error_events/{0}.txt'.format(self.job_uuid),'a')
            f.write('Error parsing line: {0}'.format)
            f.close()   

We log error lines to a file with the name of our job ID, which will allow us to locate them later if we need to. We can then use this parser to train and evaluate the model. To train the model, we move files with three columns (a label and the data to be clustered) into the training directory. We can also add to the test data directory files with two columns only the coordinate features:

>>> num_features = 2
num_clusters = 3

training_parser = Parser('train',',',num_features+1,job_uuid)
test_parser = Parser('test',',',num_features,job_uuid)

trainingData = ssc.textFileStream("/training_data").
    map(lambda x: training_parser.parse(x)).map(lambda x: x.features)
testData = ssc.textFileStream("/test_data").
    map(lambda x: test_parser.parse(x)).map(lambda x: x.features)
streaming_clustering = StreamingKMeans(k=num_clusters, decayFactor=1.0).
    setRandomCenters(num_features,0,0)
streaming_clustering.trainOn(trainingData)
streaming_clustering.predictOn(testData).
    pprint()
ssc.start()  

The decay factor in the parameters gives the recipe for combining current cluster centers and old ones. For parameter 1.0, we use an equal weight between old and new, while for the other extreme, at 0, we only use the new data. If we stop the model at any point we, can inspect it using the lastestModel() function:

>>>  streaming_clustering.latestModel().clusterCenters

We could also predict using the predict() function on an appropriately sized vector:

>> streaming_clustering.latestModel().predict([ … ])
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset