Case study – logistic regression service

As an illustration of the architecture covered previously, let us look at an example of a prediction service that implements a logistic regression model. The model is both trained and scores new data using information passed through URLs (either through the web browser or invoking curl on the command line), and illustrates how these components fit together. We will also examine how we can interactively test these components using the same IPython notebooks as before, while also allowing us to seamlessly deploying the resulting code in an independent application.

Our first task is to set up the databases used to store the information used in modeling, as well as the result and model parameters.

Setting up the database

As a first step in our application, we will set up the database to store our training data and models, and scores obtained for new data. The examples for this exercise consist of data from a marketing campaign, where the objective was to convince customers to subscribe for a term deposit (Moro, Sérgio, Paulo Cortez, and Paulo Rita. "A data-driven approach to predict the success of bank telemarketing."Decision Support Systems 62 (2014): 22-31). Thus, the objective with this data is to predict based on a customer's feature variables whether they are likely to pay for this service. The data is contained in the bank-full.csv file, which we need to load into MongoDB (https://www.mongodb.org/).

After installing MongoDB for your system, you can test the database by running the following command in your terminal:

$ mongodb

The preceding command should start the database. Now, to import our training data, we can use the following command in a separate terminal window:

$ mongoimport -d datasets -c bank --type csv --file bank-full.csv —headerline

This will allow us to import the data into a database called 'datasets', in a collection called bank. We can test if the data has been successfully loaded by opening a mongo client in the terminal:

$ mongo

If we run the following command, we should be able to see our dataset listed under the datasets database:

$ use datasets
$ show collections

We can verify that the data has been correctly parsed by examining one record:

$ db.bank.findOne()

You can see that record appears like a Python dictionary. To retrieve elements with particular values, we can use findOne with key:values set to the filters we want to apply:

$ db.bank.findOne({},{key:value,..})

Now that we have the data loaded, we can interact with it through Python using the pymongo client. We initialize a client with access to the database we just created using the following:

>>> from pymongo import MongoClient
>>> MONGODB_HOST = 'localhost'
>>> MONGODB_PORT = 27017
>>> DBS_NAME = 'datasets'
>>> COLLECTION_NAME = 'bank'
>>> connection = MongoClient(MONGODB_HOST, MONGODB_PORT)
>>> collection = connection[DBS_NAME][COLLECTION_NAME]
>>> customers = collection.find(projection=FIELDS)

Note that the mongod command still needs to be running in a separate terminal window for you to access the database through Python. The customers object will then contain each customer's records. While for the current example we will primarily access MongoDB using the SparkConnector, the commands above will be useful in Chapter 9, Reporting and Testing – Iterating on Analytic Systems when we analyze the output of our model. Indeed, the MongoDB database allows us to store information used by our model service, but also can be a source of shared information for the reporting service we will build in Chapter 9, Reporting and Testing – Iterating on Analytic Systems, by visualizing the results of our modeling.

As we mentioned previously, we will also use the Redis (http://redis.io/) key-value store to log the intermediate state of long-running tasks, and also to store the serialized output from training models in Spark. After installing Redis DB on your system, you should be able to start the server by typing the following command in the terminal:

 > redis-server

Which, if successful, should give and output like the following:

Setting up the database

The Python interface for Redis in the redis-py package (which, like many of the libraries we have seen in prior chapters, may be installed using pip or easy_install) is comparable to MongoDB. If we wanted to retrieve a record from our redis database, we could the following commands to start a client and issue a query or store data:

>>> import redis
>>> r = redis.StrictRedis(host='localhost', port=6379, db=1)
>>> r.get(key)
>>> r.set(key,value)

When we start a new client using 'StrictRedis', we specify the port the redis-server is listening on (default of 6379) and the database identifier. By issuing get and set commands, we can respectively retrieve prior results or update the database with new information. As with the Python mongo client, we will need to have the redis-server command running in a separate command line window to allow us to issue commands to the database in Python.

Now that we have our databases set up, let us look at the server that will manage requests for the applications using this data.

The web server

As described previously, the web server receives requests and forwards them to the web application. For our example, we start the server using the main function:

>>>if __name__ == "__main__":

    modelparameters = json.loads(open(sys.argv[1]).readline())

    service = modelservice(modelparameters)

    run_server(service)

There are three steps: we read the parameters for this service (here, just the name of the algorithm used), which is passed as command line argument, create the web application (using the same parameter file passed in during creation in the constructor), and then start the server. As you can see, the algorithm run by the prediction service is specified using a string argument. Later we will examine how this allows us to write a generic prediction service class, rather than a specific web application for each new algorithm we might use. When we start the server; it is registered on localhost on port 5000, as you can see by examining the body of the run_server function:

>>>  def run_server(app):
    import paste
    from paste.translogger import TransLogger
    app_ = TransLogger(app)
    cherrypy.tree.graft(app_, '/')
    cherrypy.config.update({
        'engine.autoreload.on': True,
        'log.screen': True,
        'server.socket_port': 5000,
        'server.socket_host': '0.0.0.0'
    })
    cherrypy.engine.start()
    cherrypy.engine.block()

There are a few key things happening in this function. Firstly, we see middleware in action since the TransLogger class from the paste library passes requests between the server and the application. The TransLogger object then represents a valid WGSI application since it has a callable (the application). We use the tree.graft command to attach the application (the model service itself) so that the object is called by the CherryPy modelserver whenever it receives an HTTP request.

When we start the cherrypy server, we provide a few configurations. The enable.autoreload.on parameter controls whether the application will refresh when we change the source files it is pointing to, in this case our Flask application. Log.screen directs the output of error and access message to the stdout, which is useful when we are still debugging. Finally, the last two settings specify the URL and endpoint where we will send requests to the application.

Once we start the application, we also set it to block, which means it must finish processing one request before considering another. If we want to tune performance, we could remove this configuration, which would allow the application to receive multiple requests without waiting for the first to finish. The URL for this server is thus accessed by http://0.0.0.0:5000 once it is running—this is the address where we will send our various commands to the prediction service. To start the server, type the following in the command line:

> python modelserver.py parameters.json

The parameters.json file could contain parameters for the modelservice application that will be used when starting the modeling application, but for now we actually place nothing in this file. If successful, you should see the following output in the terminal:

The web server

As we issue curl commands to the server, we will see the responses displayed in this output as well.

The web application

Now that we have started the server and can begin receiving commands from the client, let us look at the commands that will be executed by our application, such as HTTP requests issued through the Python notebook or curl commands. The code that is executed when we send requests to the CherryPy server is contained in the modelservice.py file.

The constructor for the application, called by the CherryPy server when we started it, returns an app object specified using the Flask framework:

>>> def modelservice(model_parameters):
  …return app
What is the definition of app? If we examine the beginning of the modelservice.py file, we see that app is defined using the Flask library:
>>> app = Flask(__name__)
… app.config.update(CELERY_BROKER_URL='redis://localhost:6379',CELERY_RESULT_BACKEND='redis://localhost:6379')
… celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],broker=app.config['CELERY_BROKER_URL'])
… celery.conf.update(app.config)

In addition to creating the Flask object app, we also generate a celery object. What is this celery object? As mentioned previously, we do not want to have our clients wait on long-running tasks to respond, as this would cause the client applications to potentially hang or timeout. Thus, our application needs to be non-blocking and return an immediate value for a long-running task, which is an ID that allows us to access the progress and results of the task through a REST API. We want to run the long-running task in a secondary process and have it report back the results or intermediate state as they become available. For our application, we will be using the Celery library (http://www.celeryproject.org/), an asynchronous task queuing system that is ideal for this sort of application. Celery consists of a client that submits jobs to a queue, and worker tasks, which read from this queue, perform work, and return the results to the client. The client and workers communicate via a messaging queue, such as the Redis key-value store we mentioned previously, and results are also persisted to this database. The arguments CELERY_BROKER_URL and CELERY_RESULT_BACKEND are used to specify, respectively, where the worker tasks retrieve information on scheduled tasks, and where we can look up information on the status of currently running tasks. In our example, both functions are served by Redis, but we could substitute other systems, such as the message queue system RabbitMQ (https://www.rabbitmq.com/).

In order for us to issue HTTP requests to the Celery worker tasks, we need to make sure that redis is already running, and then start the Celery workers using the following command:

> celery worker -A modelservice.celery

This starts celery worker processes with access to the commands specified in modelservice.py which we will cover below. If successful, you will see the following in your terminal.

The web application

As we later send requests to the service which are passed off to the Celery workers, information (such as Spark outputs) will be printed in this window as well.

The flow of a prediction service – training a model

So now that we have the Celery process running along with the Flask application, how can we define the functions executed by the workers in response to our HTTP requests? How can we specify the URLs to which we will issue curl commands? We will illustrate the flow of events by showing how a call to the training function will kick off a series of Spark jobs to perform cross validation and store a LogisticRegression model.

We start by issuing a curl command to the train function with the following command:

curl -X POST http://0.0.0.0:5000/train/ -d @job.json --header "Content-Type: application/json"

We could have similarly used the Python requests library to transmit the information in job.json to the model training task. The job.json file contains all the parameters we might need to use in the various stages of parsing the data and training the model, as we will see as we walk through the flow of this request through our application. When this command is received by the CherryPy modelserver, it is forwarded to the Flask app defined in modelservice.py. How can we make the Flask application respond to this request? It is as easy as providing a decorator specifying a function to run in response to requests to this URL:

>>> @app.route("/train/",methods=["POST"])
… def train():
…    try:
 …       parsed_parameters = request.json
     …   trainTask = train_task.apply_async(args=[parsed_parameters])
     …   return json.dumps( {"job_id": trainTask.id } )
    except:
    …    print(traceback.format_exc())

The @app.route decorator indicates that the Flask object app listens for POST commands to a URL given as an argument to route. In responses, it extracts the dictionary of parameters from the POST request and passes them to a train_task, which will be run on a Celery worker process through the apply_async function. We then immediately return a task identifier associated with this task, which we can use to check the status or, as we will see, identify the output of the resulting model.

How do we specify the Celery task train_task? Similarly, we provide a decorator indicating that this function will be run on a worker process:

>>> @celery.task(bind=True)
… def train_task(self,parameters):
…   try: 
 …       spark_conf = start_conf(parameters)
…        model.set_model(parameters)
…        messagehandler = MessageHandler(self)
 …       model.train(parameters,messagehandler=messagehandler,sc=spark_conf)
 …   except:
…        messagehandler.update('FAILURE',traceback.format_exc())

There are a few important details here. First, along with annotating the function with @celery.task, we provide the argument bind=True. This ensures that the function has a 'self' argument. Why would we need a self argument? In our example, we attach a MessangeHandler object to the training task using a reference to the function (self), allowing us to inject updates on the status of the task as it proceeds, and also retrieve the identifier for the task which was returned after we issued the POST request. The MessageHandler class is relatively simple and defined as follows in the messagehandler.py file in the code examples for this chapter:

>>> class MessageHandler:

    …def __init__(self,parent):
    …    self.parent = parent
    …   self.task_id = parent.request.id

    … def update(self,state,message):
    …    self.parent.update_state(state=state,meta={"message": message})

    …def get_id(self):
    …return self.task_id

When we construct the MessageHandler object, we retrieve the ID associated with the tasks from the request.id field. If we had not used the bind=True argument above, we would not be able to access this field, since we would not have a reference (self) to the task object to pass to the MessageHandler. This is also needed for the update function, which allows us to inject status updates about the progress of the task using the reference to the train task above. Finally, if we need to access the training task identifier anywhere else in our application, we can do so using get_id.

How could we access the tasks status modified by update? If you recall, when we initialized the Celery application, we provided the Redis database as a storage location for task status information. Using the identifier returned in response to our POST request, we could use a GET method to look up the status of this task, which we specify through another Flask app endpoint:

>>> @app.route('/training/status/<task_id>')
… def training_status(task_id):
…    try: 
…        task = train_task.AsyncResult(task_id)
…        message = ""
…        if task.state == 'PENDING':
 …           response = {
 …               'status': task.status,
 …               'message': "waiting for job {0} to start".format(task_id)
 …           }
 …       elif task.state != 'FAILED':
 …           if task.info is not None:
 …               message = task.info.get('message','no message')
 …           response = {
 …               'status': task.status,
 …               'message': message
 …           }
 …       else:
…            if task.info is not None:
 …               message = task.info.get('message','no message')
 …           response = {
  …              'status': task.status,
  …              'message': message 
 …           }
 …       return json.dumps(response)
 …   except:
 …       print(traceback.format_exc())

Thus, using a curl command, we could issue a GET to obtain the status of our training task, either printing it to the console or, if we made this application more complex, using it to generate a dashboard of job states in a pipeline or system.

Now that we have a way to inject updates about the status of our tasks, let us return to the train_task definition. In addition to creating the MessageHandler for this task, we also generate a SparkConfiguration and initialize a model object. The SparkConfiguration will probably look familiar from some of the examples in previous chapters, and is returned from the following function:

>>> def start_conf(jobparameters):
    … conf = SparkConf().setAppName("prediction-service")
    … conf.set("spark.driver.allowMultipleContexts",True)
    …conf.set("spark.mongodb.input.uri",jobparameters.get('inputCollection',
   …     "mongodb://127.0.0.1/datasets.bank?readPreference=primaryPreferred"))
    … conf.set("spark.mongodb.output.uri",jobparameters.get('outputCollection',
    …    "mongodb://127.0.0.1/datasets.bankResults"))
    …return conf

Note

Note that the arguments to the SparkConfiguration are used by the Spark mongo connector. This connector is an external dependency that needs to be downloaded and added at runtime to the system path of our Spark application, which can be accomplished by adding the following to your system parameters (assuming a Linux command line environment):

export PYSPARK_SUBMIT_ARGS="--packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0 pyspark-shell"

Here we set the application name by which we will identify the train task in the Spark UI on port 4040, and allow multiple contexts through "spark.driver.allowMultipleContexts" such that several Spark applications could be potentially run in parallel. Finally, we provide the mongodb input and output locations where Spark will read the data for training and store scored results. Note that these are both given as defaults, but could be changed by modifying parameters in the job.json file, allowing our application to operate on different inputs and store to different output locations by only changing the arguments to the POST request.

Now that we have the configuration to pass to the Spark job, let us look at the model object which will receive these parameters. We construct it as a global object at the beginning of the modelservice file in the line:

>>> model = ModelFactory()

If you examine the definition of the ModelFactory class in the modelfactory.py file supplied with the code example for this chapter, you see can see that it provides a generic interface for wrapping the training and prediction functions of different machine learning algorithms:

>>> class ModelFactory:

...  def __init__(self):
…    self._model = None

…  def set_model(self,modelparameters):
…    module = importlib.import_module(modelparameters.get('name'))
…    model_class = getattr(module, modelparameters.get('name'))
…    self._model = model_class(modelparameters)

…  def get_model(self,modelparameters,modelkey):
…    module = importlib.import_module(modelparameters.get('name'))
…    model_class = getattr(module, modelparameters.get('name'))
…    self._model = model_class(modelparameters)
…    self._model.get_model(modelkey)

…  def train(self,parameters,messagehandler,sc):
…    self._model.train(parameters,messagehandler,sc)

…  def predict(self,parameters,input_data):
…    return self._model.predict(parameters,input_data)

…  def predict_all(self,parameters,messagehandler,sc):
…    self._model.predict_all(parameters,messagehandler,sc)

As you can see, nowhere in this class do we specify the particular implementation of train or prediction tasks. Rather, we create an object with an internal member (self_model) that we can set using set_model, by dynamically retrieving code associated with a particular algorithm using importlib. The "name" argument also comes from job.json, meaning we could load different algorithms in our application and run training tasks simply by changing the parameters of our POST request. In this example, we specify the model as LogisticRegressionWrapper, which will cause this model (and the class of the same name) to be loaded and inserted into the self_model of the ModelFactory when we call train_task. ModelFactory also has a generic method for loading an existing model, get_model, which takes as input a task ID such as the one generated in response to our train request and sets self_model to be a previously trained model object which is retrieved using this task ID as a reference. In addition, this class has methods for predict (to give the predicted response for a single row of data) or predict_all (to perform bulk scoring using Spark).

To recap, now we see that in response to our POST request, the CherryPy server hands off the information in data.json to the train function of our Flask service, which starts a background process on a Celery worker. This worker process sets the generic model object of our Flask app to a Logistic Regression, creates a Spark configuration to run the training task, and returns a task ID that we can use to monitor the progress of the model training. In the final step in the journey of this POST request, let us see how the Logistic Regression model implements the training task.

In the LogisticRegressionWrapper.py file, you can see the specifications of the train task:

>>> def train(self,parameters,messagehandler,spark_conf):
…        try:
…            sc = SparkContext(conf=spark_conf, pyFiles=['modelfactory.py', 'modelservice.py'])
…            sqlContext = SQLContext(sc)
…            iterations = parameters.get('iterations',None)
…            weights = parameters.get('weights',None)
…           intercept = parameters.get('intercept',False)
…            regType = parameters.get('regType',None)
 …           data = sqlContext.
 …               createDataFrame(
 …               sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").
 …               load().
 …               map(lambda x: DataParser(parameters).parse_line(x)))
 …           lr = LogisticRegression()
 …           pipeline = Pipeline(stages=[lr])
 …           paramGrid = ParamGridBuilder()
 …               .addGrid(lr.regParam, [0.1]) 
 …               .build()
            
 …           crossval = CrossValidator(estimator=pipeline,
 …                 estimatorParamMaps=paramGrid,
 …                 evaluator=BinaryClassificationEvaluator(),
 …                 numFolds=2)
 …           messagehandler.update("SUBMITTED","submitting training job")
 …           crossvalModel = crossval.fit(data)
 …           self._model = crossvalModel.bestModel.stages[-1]
 …           self._model.numFeatures = len(data.take(1)[0]['features'])
 …           self._model.numClasses = len(data.select('label').distinct().collect())
  …          r = redis.StrictRedis(host='localhost', port=6379, db=1)
  …          r.set( messagehandler.get_id(), self.serialize(self._model) )
  …          messagehandler.update("COMPLETED","completed training job")
  …          sc.stop()
  …      except:
  …          print(traceback.format_exc())
  ….          messagehandler.update("FAILED",traceback.format_exc())

First of all, we start a SparkContext using the parameters we defined in the SparkConfiguration we passed to this function. The parameters in our job.json file also include the algorithm parameters, which we parse. We then read the input data which we specified in the SparkConfiguration in a distributed fashion from mongodb into a Spark DataFrame, using a lambda function to parse the input. The parsing logic is defined in dataparser.py, in the parse_line function of the DataParser class:

>>> def parse_line(self,input,train=True):
…        try:
…            if train:
…               if self.schema_dict.get('label').get('values',None) is not None:
 …                   label = self.schema_dict.
 …                   get('label').
 …                   get('values').
 …                   get(input[self.schema_dict.
 …                   get('label').
…                    get('key')])
 …               else:
 …                   label = input[self.schema_dict.
 …                   get('label').
 …                   get('key')]
 …           features = []
 …           for f in self.schema_dict['features']:
 …               if f.get('values',None) is not None:
 …                   cat_feature = [ 0 ] * len(f['values'].keys())
 …                  if len(f['values'].keys()) > 1: # 1 hot encoding
 …                       cat_feature[f['values'][str(input[f.get('key')])]] = 1
 …                   features += cat_feature # numerical
 …               else:
 …                   features += [ input[f.get('key')] ]

 …           if train:
 …               Record = Row("features", "label")
 …               return Record(Vectors.dense(features),label)
 …           else:
 …               return Vectors.dense(features)

…        except:
…            print(traceback.format_exc())
…            pass

The DataParser class takes as input a parameters dictionary containing the schema of the data that—once again—we specified in our job.json data we included in our POST request. This information is stored in the self._schema property of the parser. Using this information, the parse_line function extracts the label (the response column) and encodes it as a numeric value if necessary. Similarly, the features of each record are parsed and, if necessary, one-hot encoded using information in the POST request. If the data is to be used in training (train=True), the parser returns the label and a vector of features. Otherwise, it just returns the features to be used in scoring new records. In either case, the features are encoded as a dense Vector from the Spark ml library (which is required for the logistic regression algorithm), and the row is returned as a Row object to be compatible with the Spark DataFrame needed for the training code. Because the fields we use as features are specified in our job.json data, we could train models using different columns from the same dataset without changing the underlying code.

Once the data is parsed, we construct a Spark Pipeline object to handle the stages of the model training. In our example, the only step is the model training itself, but we could potentially have transformations like the Vectorizers we examined in Chapter 6, Words and Pixels – Working with Unstructured Data in the context of text data as part of such as pipeline. We then create a ParamGrid to perform a grid search of the regularization parameter of our model, and pass it to a CrossValidator, which will peform n-fold validation to determine the best model. Once we have fit this model, we retrieve the optimal model from the CrossValidator results and determine the number of features and classes used in the model. Finally, we open a connection to the Redis database and store the parameters of this model after serializing it with the function:

>>> def serialize(self,model):
…        try:
…            model_dict = {}
 …           model_dict['weights'] = model.weights.tolist()
…            model_dict['intercept'] = model.intercept
 …           model_dict['numFeatures'] = model.numFeatures
…            model_dict['numClasses'] = model.numClasses
 …           return json.dumps(model_dict)
…        except:
 …           raise Exception("failed serializing model: {0}".format(traceback.format_exc()))

Notice that we use the MessageHandler attached to this task to retrieve the task ID, which is used as the key to store the serialized model in Redis. Also, though we store the result in the same Redis instance listening on port 6379 that is used by Celery to queue tasks and update the status of background tasks, we save to db 1 instead of the default 0 to separate the information.

By tracing through the steps above, you should now be able to see how a POST request can be translated into a series of commands that parse data, perform cross-validated grid-search to train a model, and then serialize that model for later use. You should also appreciate how the parameterizations at each layer allow us to modify the behavior of this training task purely by modifying the contents of the POST request, and how the modularity of the application will make it easy to extend to other models. We also have utilized Spark, which will allow us to easily scale our calculations to larger datasets over time.

Now that we have illustrated the logical flow of data in our prediction service, let us finish by examining the prediction functions, whose output we will use in Chapter 9, Reporting and Testing – Iterating on Analytic Systems.

On-demand and bulk prediction

Now that we have a trained model saved in our system, how can we utilize it to score new data? Our Flask app has two endpoints for this service. In the first, we make a POST request giving a row of data as a json, along with a model ID, and ask for a score from the logistic regression model:

>>> @app.route("/predict/",methods=['POST'])
… def predict():
…    try:
…        parsed_parameters = request.json
 …       model.get_model(parsed_parameters,parsed_parameters.get('modelkey'))
…        score = model.predict(parsed_parameters,parsed_parameters.get('record'))
…        return json.dumps(score)
…    except:
 …       print(traceback.format_exc())

This time, instead of calling the set_model method of ModelFactory, we use get_model to load a previously trained model, then use it to predict the label of the input record and return the value. In the case of Logistic Regression, this will be a 0 or 1 value. While we do not provide a user interface in this example, we could imagine a simple form in which the user specifies a number of features of a record and submits them through a POST request, receiving back a prediction in realtime.

Looking at the implementation of get_model in LogisticRegressionWrapper, we see that we can retrieve and de-serialize the model we generated in the train task, and assign it to the self._model member of ModelFactory:

>>> def get_model(self,modelkey):
…        try:
…            r = redis.StrictRedis(host='localhost', port=6379, db=1)
…            model_dict = json.loads(r.get(modelkey))
 …           self._model = LogisticRegressionModel(weights=Vectors.dense(model_dict['weights']),
 …               intercept=model_dict['intercept'],
 …               numFeatures=model_dict['numFeatures'],
 …               numClasses=model_dict['numClasses']
 …               )
 …       except:
  …          raise Exception("couldn't load model {0}: {1}".format(modelkey,traceback.format_exc()))

Subsequently, when we score a new record, we call the predict function to parse this record and use the de-serialized model to generate a prediction:

>>> def predict(self,parameters,input_data):
…        try:
…            if self._model is not None:
 …               return self._model.predict(DataParser(parameters).parse_line(input_data,train=False))
 …           else:
 …               return "Error, no model is trained to give predictions"
 …       except:
 …           print(traceback.format_exc())

This sort of functionality will be useful for interactive applications, such as a human user submitting a few records of interest to obtain predictions, or for real time applications in which we might receive streaming input and provide predictions for immediate use. Note that thought we do not use Spark in this particular instance, we still have a nice opportunity for horizontal scaling. Once we have trained the model, we could de-serialize the resulting parameters in several copies of the modelservice, which will allow use to potentially avoid timeouts if we receive many requests. However, in cases where the volume of predictions required is large and the necessary latency is not realtime, it may be more effective to utilize Spark to perform bulk-scoring of records in our database. We implement this bulk-scoring capability using a Celery task in a manner similar to the train_task, specifying a predictall endpoint in the Flask app:

>>> @app.route("/predictall/",methods=["POST"])
… def predictall():
…    try:
…       parsed_parameters = request.json

…        predictTask = predict_task.apply_async(args=[parsed_parameters])
…        return json.dumps( {"job_id": predictTask.id } )
…    except:
…        print(traceback.format_exc())

The associated Celery task is show below:

>>> @celery.task(bind=True)
… def predict_task(self,parameters):
…    try: 
…        spark_conf = start_conf(parameters)
 …       messagehandler = MessageHandler(self)
…        model.get_model(parameters,parameters.get('modelkey'))
…        print(model._model._model)
 …       model.predict_all(parameters,messagehandler=messagehandler,sc=spark_conf)
…    except:
…        messagehandler.update('FAILURE',traceback.format_exc())

Again, we create a SparkConfiguration and MessageHandler, and like the predict method, we use a prior model ID specified in job.json to load a previous train model. We then call the predict_all method of this model to start a bulk scoring routine that will generate predictions for a large collection of data, and store the resulting in the mongodb collection specified by the output location parameter of the SparkConfiguration. For the LogisticRegressionWrapper, the predict_all method is shown below:

>>> def predict_all(self,parameters,messagehandler,spark_conf):
…        try:
…            sc = SparkContext(conf=spark_conf, pyFiles=['modelfactory.py', 'modelservice.py'])
…            sqlContext = SQLContext(sc)
…            Record = Row("score","value")
…           scored_data = sqlContext.
…                createDataFrame(
…                sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").
…                load().
…                map(lambda x: Record(self._model.predict(DataParser(parameters).parse_line(x,train=False)),x)))
…           messagehandler.update("SUBMITTED","submitting scoring job")
… scored_data.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
…            sc.stop()
…        except:
…         messagehander.update("FAILED",traceback.format_exc())

As with the training task, we start a SparkContext using the SparkConfiguration we defined in the Celery task, and load the input from mongodb using the Spark connector. Instead of simply parsing the data, we score the parsed records using the de-serialized model we loaded using the get_model command, and pass both it and the original record into a new Row object, which now has two columns: the score and the input. We then save this data back to mongodb.

If you open the mongo client and examine the bankResults collection, you can verify that it now contains the bulk-scored input data. We will utilize these results in Chapter 9, Reporting and Testing – Iterating on Analytic Systems where we will expose these scores in a reporting application to visualize the ongoing performance of our model and diagnose potential issues in model performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset