In this chapter, we will discuss the pipeline pattern of model serving. In this pattern, we create a pipeline with a number of stages. In each of the stages, some functionalities (such as data collection and feature extraction) are performed. The pipeline is started using a scheduler or it can be started manually. We will discuss how to create a pipeline for serving machine learning (ML) models and demonstrate how the pipeline pattern can be used for this purpose. The high-level topics that will be covered in this chapter are as follows:
In this chapter, we will use Apache Airflow to demonstrate pipeline creation via the pipeline pattern. For all the resources, documentation, and tutorials on Apache Airflow, please visit their website: https://airflow.apache.org/. We will give a high-level overview of Apache Airflow and how to use it to create pipelines, but we will not cover details of the Apache Airflow infrastructure. Feel free to learn more about Apache Airflow to see how to create complex pipelines and workflows. Please install Apache Airflow following the instructions for your OS on their official website.
All the code for this chapter is provided at https://github.com/PacktPublishing/Machine-Learning-Model-Serving-Patterns-and-Best-Practices/tree/main/Chapter%209.
If you get ModuleNotFoundError while trying to import any library, then you should install that module using the pip3 install <module_name> command.
In the pipeline pattern, we create a pipeline to serve an ML model. Instead of all the steps of an ML pipeline happening in a central place, the steps are separated to occur in different stages of a pipeline. A pipeline is a collection of a number of stages in the form of a directed acyclic graph (DAG). We will start by describing DAGs in the following subsection and then gradually describe how the pipeline is formed by connecting different stages as a DAG.
A DAG is a graph where the vertices are connected by directed edges, and no path in the graph forms a cycle. A path is a sequence of edges in a graph. A path forms a cycle if the start and end nodes of the path are the same. The directions of the edges are denoted by arrows. Nodes in the DAG are represented by circles. To understand DAGs graphically, let’s see a few graphs that are not DAGs.
First of all, let’s look at the graph in Figure 9.1. This graph is not a DAG as one of the edges of this graph, AC, does not have a direction.
Figure 9.1 – A non-DAG as the edge AC does not have a direction
Therefore, the presence of direction in a graph is a key requirement for the graph to be a DAG. This is evident from the name of this kind of graph. The letter D denotes Directed, which can help you to remember that the edges in the graph will have direction.
Now let’s look at the graph in Figure 9.2. In this graph, again, all the edges have direction. However, the graph is still not a DAG.
Figure 9.2 – A non-DAG due to the presence of a cyclic path, CBD
The second letter A in DAG denotes Acyclic, which means there can’t be any cyclic paths in the graph. The graph in Figure 9.2 has a cyclic path, CBD, which is colored red in the figure. Due to the presence of this cyclic path, the graph in Figure 9.2 is not a DAG. In this subsection, we have discussed some concepts on DAG. In the next subsection, we will discuss the stages of the pipeline and how a stage can be represented as a node in the DAG.
A stage of a pipeline is like a node in a graph. In these stages, a unit of activity happens. We can split the bigger task of the end-to-end process of serving the ML model, starting with data collection, into multiple steps as follows:
Note
Please note that here serving is not decoupled from the pipeline. However, the main goal of serving is to provide customers with access to the model’s response, usually from the last stage of the pipeline. We can take the prediction created from the prediction stage directly or create some REST APIs to access the predictions.
The pipeline is the connection of stages in the form of a DAG. The graph needs to be acyclic because we do not want to get lost in a loop inside the pipeline. If a cyclic path exists in the pipeline, then there is a chance that the pipeline will keep looping through that cyclic path. Therefore, we will use a DAG to create the pipeline. Using a DAG will ensure the following things:
In this section, we have learned about the pipeline pattern, DAGs, and how DAGs can be used to create pipelines via the pipeline pattern. In the next section, we will introduce you to Apache Airflow.
In this section, we will give you a high-level overview of Apache Airflow. To find out more details about Apache Airflow, read the documentation on their official website. The link to the official website is provided in the Technical requirements section.
Apache Airflow is a full stack platform for creating workflows or pipelines using Python, scheduling the pipeline, and also monitoring the pipeline using the GUI dashboard provided by the platform. To see the tool on your local machine and understand how to create a pipeline, please install Apache Airflow. I am showing the steps I used to install Apache Airflow on macOS:
export AIRFLOW_HOME=/Users/mislam/Desktop/airflow-examples
pip install "apache-airflow[celery]==2.3.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.4/constraints-3.7.txt"
You can also install Apace Airflow in a virtual environment instead of on your own platform. Follow the instructions on the installation page, https://airflow.apache.org/docs/apache-airflow/stable/installation/index.html, to get detailed instructions on installation to your appropriate device.
$ airflow version
You should see the version number of Apache Airflow on the terminal. You can then go to the directory, which is set as AIRFLOW_HOME, and see some files created for you, as shown in Figure 9.3. You will not see a dags folder at the beginning. This is because you need to create it manually.
Figure 9.3 – Apache Airflow created installation files
We have to create the dags folder manually and place all the DAGs in it. To understand why we need to place all the DAGs in this folder, open the airflow.cfg file and find out the value for the dags_folder variable, as shown in Figure 9.4. Airflow fetches the DAGs from this folder. If you need to place DAGs from a different folder, you can change this path.
Figure 9.4 – The value of the dags_folder variable in the airflow.cfg file
Therefore, whenever we want to run a workflow and monitor it using the GUI, we need to place it inside the $AIRFLOW_HOME/dags folder.
Then, in the terminal, we run the following command to initialize the Airflow database that will be used to keep track of the users and the workflows:
airflow db init
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
Feel free to change the information if you want. We use the dummy values to get access to the UI for demonstration purposes.
After Apache Airflow is installed successfully, we can create workflows and see whether it is working:
# The DAG object; we'll need this to instantiate a DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'example_tutorial',
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 1, 1),
catchup=False,
tags=['dummy'],
) as dag:
# t1, t2 examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='stage1',
bash_command='python3 /Users/mislam/Desktop/airflow-examples/dags/stage1.py',
)
t2 = BashOperator(
task_id='stage2',
bash_command=' python3 /Users/mislam/Desktop/airflow-examples/dags/stage2.py',
)
t1 >> t2
In this code snippet, we first create a DAG using the DAG class provided by Airflow. We set the id of the DAG to example_tutorial. We also pass other properties, such as tags = ['dummy'], schedule_interval = timedelta(days=1), and so on. Using tags, we can search the workflow from the dashboard, as shown in Figure 9.7.
Then, we use BashOperator (provided by Airflow) to create a stage in the pipeline. According to Airflow terminology, stages are known as tasks. We create two tasks, t1 and t2, with IDs of stage1 and stage2, respectively. In the first stage, we run a Python file, stage1.py, and in the second stage, we run another Python file, stage2.py.
The stage1.py file contains the following code snippet:
import time print("I am at stage 1") time.sleep(5) print("After 5 seconds I am existing stage 1")
And the stage2.py file contains the following code snippet:
print("I am at stage 2")
airflow scheduler
airflow webserver
We should see that the web server has been started, as shown in Figure 9.5:
Figure 9.5 – The Airflow web server has been started and is listening at http://0.0.0.0:8080 or http://localhost:8080
Figure 9.6 – The Sign In screen of the Airflow UI launched after visiting the URL http://localhost:8080/
Figure 9.7 – List of workflows shown in the Airflow UI after signing in
Figure 9.8 – Detailed view of the workflow first_airflow_pipeline
The home page of the workflow has a lot of information. You can view the history of runs, the next run time, the logs, the graphical representation of the pipeline, and more. For example, in our example, we created a pipeline with two stages, stage1 and stage2. We see the pipeline on the dashboard, along with the direction of connection between the edges. This direction is specified in the code. In our code, we created the pipeline and specified the direction from stage1 to stage2 via t1 >> t2 in the following code snippet:
t1 = BashOperator( task_id='stage1', bash_command=' python3 /Users/mislam/Desktop/airflow-examples/dags/stage1.py', ) t2 = BashOperator( task_id='stage2', bash_command=' python3 /Users/mislam/Desktop/airflow-examples/dags/stage2.py', ) t1 >> t2
If we want to reverse the direction, then we have to replace t1 >> t2 with t2 >> t1. The >> or << operator, which looks like an arrowhead, gets converted to an arrow in the pipeline, and the operator direction is the same as the direction of the arrow.
Figure 9.9 – Button to start the workflow manually
After both stages have finished running, we will see that the border of both stages has turned green, as shown in Figure 9.10. The legends above the Auto-refresh button show the meaning of the different border colors.
Figure 9.10 – All the stages of the pipeline completed
Later stages in the pipeline can only start after the previous stages have finished successfully. You can see this change of color interactively in the UI. For example, in our pipeline, stage2 will only start after stage1 is finished. We can confirm this by looking at the log. Let’s click on stage1 of the pipeline, and we will see a pop-up window, as shown in Figure 9.11:
Figure 9.11 – Pop-up window appears after clicking stage1 from which we can see the logs and execution history
By clicking the Log button, we can see the stage1 log and note the following two lines from the log, which prints the following output from the code in the stage1.py Python file that is being run inside stage1:
[2022-09-03, 06:00:07 UTC] {subprocess.py:92} INFO - I am at stage 1 [2022-09-03, 06:00:07 UTC] {subprocess.py:92} INFO - After 5 seconds I am existing stage 1
Similarly, we check the stage2 log and note the following line, which prints the output from the code in the stage2.py Python file, which is run inside stage2:
[2022-09-03, 06:00:12 UTC] {subprocess.py:92} INFO - I am at stage 2
From the highlighted times of the logs from the two stages, we can see that the output of stage2 was generated after the execution of stage1 completed. So, we have evidence that a later stage in the directional path starts only after the previous stages are complete.
In this section, you were introduced to Apache Airflow and installed it, and saw how to view the pipelines using the Airflow UI, how to run a pipeline, and how to view the output. In the next section, we will create a dummy ML pipeline using Airflow and see how to create a pipeline to serve an ML model following the pipeline pattern.
In this section, we will create a dummy ML pipeline. The pipeline will have the following stages:
You can have multiple stages depending on the complexity of your end-to-end process. Let’s take a look:
with DAG(
'dummy_ml_pipeline',
description='A dummy machine learning pipeline',
schedule_interval="0/5 * * * *",
tags=['ml_pipeline'],
) as dag:
init_data_directory >> [data_collection_source1, data_collection_source2] >> data_combiner >> training
In the preceding code snippet, we created a pipeline by connecting the tasks or stages using init_data_directory >> [data_collection_source1, data_collection_source2] >> data_combiner >> training. Between the first and second >> operators, we have passed a list of two stages. This means that the data_combiner stage will only start if both the data_collection_source1 and data_collection_source2 stages are complete. It will be clearer if we look at the pipeline in the UI, as shown in Figure 9.12:
Figure 9.12 – A dummy pipeline with multiple dependencies
In the code snippet, we have omitted some of the parameters from the constructor that are the same as the pipeline we created before. We have changed the id, description, schedule_interval, and tags parameters of the DAG. The schedule_interval parameter is now given a "0/5 * * * *" cron expression. The meaning of this cron expression is that the cron job will run every 5 minutes and start the pipeline. This is how we can schedule our pipeline to run automatically at a certain time interval.
init_data_directory = BashOperator(
task_id='init_data_dir',
bash_command='python3 /Users/mislam/Desktop/airflow-examples/dags/stages/init_data_dir.py',
)
data_collection_source1 = BashOperator(
task_id='data_collection_1',
bash_command='python3 /Users/mislam/Desktop/airflow-examples/dags/stages/data_collector_source1.py',
)
data_collection_source2 = BashOperator(
task_id='data_collection_2',
bash_command='python3 /Users/mislam/Desktop/airflow-examples/dags/stages/data_collector_source2.py',
)
data_combiner = BashOperator(
task_id='data_combiner',
bash_command='python3 /Users/mislam/Desktop/airflow-examples/dags/stages/data_combiner.py',
)
training = BashOperator(
task_id='training',
bash_command='python3 /Users/mislam/Desktop/airflow-examples/dags/stages/train.py',
)
Note
You may notice that in these operators, we are using the path to different files and codes from the machine used to write these codes. You will have to change these paths based on the locations on your own machine. The same suggestion applies to all the examples we are using throughout the chapter.
import os
if not os.path.exists("/Users/mislam/Desktop/airflow-examples/dags/stages/data"):
os.mkdir("/Users/mislam/Desktop/airflow-examples/dags/stages/data")
print("The directory data is created")
if not os.path.exists("/Users/mislam/Desktop/airflow-examples/dags/stages/model_location"):
os.mkdir("/Users/mislam/Desktop/airflow-examples/dags/stages/model_location")
print("The directory model_location is created")
It creates two directories, data, and model_location, for storing the data and model respectively in later stages.
The data_collector_source1.py file contains the following code snippet:
import pandas as pd df1 = pd.DataFrame({"x": [2, 2, 2, 3, 4, 5], "y": [1, 1, 1, 1, 1, 1]}) df1.to_csv("/Users/mislam/Desktop/airflow-examples/dags/stages/data/data1.csv")
So, we write some dummy data in a CSV file in this code. We do a similar thing in the data_collector_source2.py file. In this file, we combine the data from these two data collection stages and write the combined data to a CSV file. In the training stage, we train a LogisticRegression model and save the trained model to the model_location folder created in the first stage.
Figure 9.13 – Model is saved successfully after the execution of the pipeline is finished
We have saved this model to our local directory. We can save it inside a server, which will be used by the APIs, and we can create a prediction API to use the model for prediction.
Figure 9.14 – The pipeline is scheduled to run every 5 minutes using a cron expression
In this section, we have seen how to create an ML serving pipeline using Apache Airflow. In the next section, we will see some advantages and disadvantages of the pipeline pattern.
In this section, we will explore some of the advantages and disadvantages of the pipeline pattern.
First of all, the advantages of the pipeline pattern are as follows:
The advantages mentioned here make the pipeline pattern a very good option to choose if the ML application we want to deploy has a lot of stages.
Some of the disadvantages of the pipeline pattern are as follows:
In this section, we have seen some of the advantages and disadvantages of the pipeline pattern. In the next section, we will summarize the chapter and conclude.
In this chapter, we explored the pipeline model serving pattern and discussed how DAGs can be used to create a pipeline. We have also covered some fundamental concepts on DAG to help you understand what DAGs are.
Then we introduced a tool called Apache Airflow, which can be used to create pipelines. We saw how to get started with Apache Airflow and how to use the operators provided by Apache Airflow to create a pipeline. We saw how dependencies are created using Apache Airflow and how to create separate stages using separate Python files.
We then created a dummy ML pipeline for collecting and combining data, training a model using the data, and then saving the model to a location that is accessible by the server. We explored how to create many-to-one dependencies among the stages for when a stage’s actions depend on the completion of multiple stages.
Finally, we discussed the advantages and disadvantages of the pipeline pattern. In the next chapter, we will discuss the ensemble pattern. We will see how multiple models can be combined and served.