In this chapter, we look into Apache Spark’s structured streaming feature. So far, we have seen the fluent APIs Apache Spark provides for batch processing data. Typical ETL-based data flows are batch oriented and operate on static data. In this case, this static data has been obtained and is available for processing. While this is one side of the coin, the other side is streaming data. For example, websites such as Twitter and Facebook are continuously fed with data from their billions of users. This data is coming into these websites at a rapid rate. If the websites need to identify the most talked about topic for the past hour, for example, they have to act very fast. They will not have time to consolidate the data, store it, and then apply ETL processing to it. Rather, these websites need to apply the analytics on the fly while the data is coming into the system. Such processing is called stream computation .
These kinds of computations are very helpful in scenarios such as monitoring and diagnosis, identifying trends in the data, and especially in the IoT world where there is an ever-flowing stream of data from which we have to predict device malfunction, for example. Apache Spark provides streaming APIs that enable you to receive such streaming data and apply analytics over it. This is a very interesting topic; let’s get into the recipes.
Recipe 8-1. Set up a streaming DataFrame on a directory
Recipe 8-2. Initiate a streaming query and see it in action
Recipe 8-3. Apply PySparkSQL on streaming
Recipe 8-4. Join streaming data with static data
Recipe 8-1. Set Up a Streaming DataFrame on a Directory
Problem
You want to create a streaming DataFrame on a directory.
Solution
File source : This is essentially when you get the streaming data as incremental files. Spark supports file formats such txt, csv, json, and parquet. It is very similar to creating a DataFrame from a static file. But with a File source, you will create a stream on a directory into which you are expecting the incremental files to be placed. You might wonder how there will be incremental files, when Kafka and similar tools are used. In the enterprise world that uses a wide variety of technologies, including legacy software such as mainframes etc., those systems will provide you with data in the form of files. So it is essential to consider incremental files as a source of streaming. We will delve more into that later.
Kafka source : Kafka is the distributed streaming platform that has become de facto in the Big Data world. Kafka is a messaging system that contains a message broker supporting topics. These topics act as message producers, which in turn can be consumed by consumers.
Socket source and Rate source : These sources are mainly available for testing purposes. Socket sources can read data from socket connections. For example, you can connect Apache Spark to any particular socket to consume the streaming data coming to the socket. Similarly, Rate sources can be use to generate rows at any frequency. They contain timestamps and values where the timestamp is the time at which the row was generated and the value is a long containing a message count.
Let’s go ahead and create DataFrame streaming from a File source.
This data has seven rows, with the header row containing the column names.
How It Works
First and foremost, we need to create a schema and assign it to the newly created DataFrame.
As you see in the code, you need to first import the StructType. Then you need to create a struct type with the necessary column along with its data type, as shown.
There are multiple things going on in this code. Let’s examine them one by one.
Initially we are calling the method readStream. This method is essential to creating the streaming. This method returns a DataStreamReader used to read streaming data as a DataFrame.
Next, we are setting the separator to , (comma). This is because we are using a comma-separated value.
Then, using the schema method, we are setting the schema for the DataFrame. Finally, we are specifying the directory onto which Spark should bind and look for streaming data.
Let’s try to print the schema and then register the DataFrame as a table or view and then show the contents of the DataFrame. Initially I have kept the directory empty. So you may not see any output from the DataFrame.
I am going to apply a simple SQL query to show the contents of the table.
Don’t worry, I wanted you to understand what is going on so that you are able to solve such issues when you are doing it by yourself.
Spark streaming doesn’t start unless you call the start method on the output sink. We will look at setting the sink in the next recipe. In this recipe, we covered Spark Structured streaming, as well as setting up a streaming DataFrame from a directory and registering it as a table.
Recipe 8-2. Initiate a Streaming Query and See It in Action
Problem
You want to persist streaming data into a file.
Solution
As you saw in the previous recipe, unless you start a sink on a streaming DataFrame, the streaming is not really active. Let’s start with a simple streaming scenario and then move ahead step by step. To start with simple streaming, you need to understand some concepts.
File sink : The output is stored in a specified directory.
Kafka sink : Output is stored in Kafka topics. It could be more than one topic and multiple consumers can be listening to it for various purposes. For example, one topic could be for further processing and another one could be for persisting the data into Hadoop for archiving or for future references.
Console sink : Used for debugging purposes to print the data into the console.
Append: This is the default mode that simply appends the incoming stream of data into the sink.
Complete: This mode writes the data completely into the sink from in-memory. This is needed when there are aggregation queries performed on the streaming data. For example, if you need to determine which political leaders have maximum mentions in the news for the last 15 minutes, and so on.
Update: Only the updated rows are updated into the sink.
Now that you understand the output modes and sinks concepts, we will start the streaming and see these concepts in action.
To start, we are going to write the streamlining data into the console so you can easily see the output and understand what is going on.
How It Works
With this code, we have created a streaming DataFrame. Now we are going to start it using the writeStream method .
So far you will not see any output. Now we place the tempData.csv file into the temperature-data directory. Notice what happens then.
Notice from the output that the tempData-3.csv file is printed to the console.
Now for the interesting part. Let’s now place the tempData-2.csv file. You might have noticed that the tempData-2.csv and tempData-1.csv files contain the same data. So ideally they are duplicates. In this scenario, you will not see any output in the console. You might wonder why you are not seeing any output, and this can be easily understood from the next action.
Between tempData-2.csv and tempData-4.csv, I made a small change to day7’s temperature (it’s now 12.7 instead of the original 12.6). With that one change, you are able to see the output in the console.
Now we are going to see the streaming DataFrame represented as an in-memory table so that we can directly apply PySparkSQL queries to it.
You can see that we made two changes compared to the previous execution of this statement. First, we changed the format from console to memory. This is to signify that instead of writing the output to the console, we are asking Spark to save the data in memory.
Recipe 8-3. Apply PySparkSQL on Streaming
Problem
You want to apply SQL filters on the streaming data.
Solution
We need to apply a filter to the same table.
How It Works
So with Spark’s structured streaming, we can consider it a table and Spark provides us with data-wrangling capabilities with the ease of SQLs. Similarly, we can perform any SQL operations on this table. You may want to join this data with static data to be able to map the incoming data to apply analytics and get trend reports on specific aspects.
Recipe 8-4. Join Streaming Data with Static Data
Problem
You need to join streaming data with static data.
Solution
In this recipe, you need to join streaming data with static data to get more information from the streaming data. I have modified the data a little bit to be able to complete this recipe.
The data in Figure 8-6 is static data that represents ZipCode and City. For illustration purposes, I use eight cities.
time—Time at which the reading was taken
zipCode—ZIP code for which the reading was taken
tempInCelsius—Temperature reading in degrees Celsius
How It Works
Step 8-4-1. Creating a Static Table Over the Zip Code City Data File
Step 8-4-2. Creating a Streaming DataFrame for the Temperate Data for ZIP Code Files
Step 8-4-3. Join Static Data with Streaming Data
You can observe that the join between temperature_streaming and CityZipcode is realized at the console output automatically. This is the power of Spark streaming. Instead of printing the output to the console, you can easily persist it into a file or keep it in memory and further apply analytics. With this chapter, you are able to successfully use Spark structured streaming to use process streaming data.