© Raju Kumar Mishra and Sundar Rajan Raman 2019
Raju Kumar Mishra and Sundar Rajan RamanPySpark SQL Recipeshttps://doi.org/10.1007/978-1-4842-4335-0_8

8. Structured Streaming

Raju Kumar Mishra1  and Sundar Rajan Raman2
(1)
Bangalore, Karnataka, India
(2)
Chennai, Tamil Nadu, India
 

In this chapter, we look into Apache Spark’s structured streaming feature. So far, we have seen the fluent APIs Apache Spark provides for batch processing data. Typical ETL-based data flows are batch oriented and operate on static data. In this case, this static data has been obtained and is available for processing. While this is one side of the coin, the other side is streaming data. For example, websites such as Twitter and Facebook are continuously fed with data from their billions of users. This data is coming into these websites at a rapid rate. If the websites need to identify the most talked about topic for the past hour, for example, they have to act very fast. They will not have time to consolidate the data, store it, and then apply ETL processing to it. Rather, these websites need to apply the analytics on the fly while the data is coming into the system. Such processing is called stream computation .

These kinds of computations are very helpful in scenarios such as monitoring and diagnosis, identifying trends in the data, and especially in the IoT world where there is an ever-flowing stream of data from which we have to predict device malfunction, for example. Apache Spark provides streaming APIs that enable you to receive such streaming data and apply analytics over it. This is a very interesting topic; let’s get into the recipes.

As part of structured streaming, we are going to discuss the following recipes:
  • Recipe 8-1. Set up a streaming DataFrame on a directory

  • Recipe 8-2. Initiate a streaming query and see it in action

  • Recipe 8-3. Apply PySparkSQL on streaming

  • Recipe 8-4. Join streaming data with static data

Recipe 8-1. Set Up a Streaming DataFrame on a Directory

Problem

You want to create a streaming DataFrame on a directory.

Solution

As part of its structured streaming feature, Spark provides multiple streaming options. They are called sources in Spark terminology.
  • File source : This is essentially when you get the streaming data as incremental files. Spark supports file formats such txt, csv, json, and parquet. It is very similar to creating a DataFrame from a static file. But with a File source, you will create a stream on a directory into which you are expecting the incremental files to be placed. You might wonder how there will be incremental files, when Kafka and similar tools are used. In the enterprise world that uses a wide variety of technologies, including legacy software such as mainframes etc., those systems will provide you with data in the form of files. So it is essential to consider incremental files as a source of streaming. We will delve more into that later.

  • Kafka source : Kafka is the distributed streaming platform that has become de facto in the Big Data world. Kafka is a messaging system that contains a message broker supporting topics. These topics act as message producers, which in turn can be consumed by consumers.

  • Socket source and Rate source : These sources are mainly available for testing purposes. Socket sources can read data from socket connections. For example, you can connect Apache Spark to any particular socket to consume the streaming data coming to the socket. Similarly, Rate sources can be use to generate rows at any frequency. They contain timestamps and values where the timestamp is the time at which the row was generated and the value is a long containing a message count.

Let’s go ahead and create DataFrame streaming from a File source.

For this example, I will be using a smaller set of temperature data for easier understanding. The data contains just two columns—day and tempInCelsius. Both are string types. The day column will contain the day for which the temperature is obtained. tempInCelsius denotes the observed temperature for that day. See Figure 8-1.
../images/469054_1_En_8_Chapter/469054_1_En_8_Fig1_HTML.jpg
Figure 8-1

Temperature dataset

This data has seven rows, with the header row containing the column names.

How It Works

First and foremost, we need to create a schema and assign it to the newly created DataFrame.

Use the following code to create a schema.
>>> from pyspark.sql.types import StructType
>>> temperatureSchema = StructType().add("day", "string").add("tempInCelsius", "string")

As you see in the code, you need to first import the StructType. Then you need to create a struct type with the necessary column along with its data type, as shown.

Let’s try to print the content of the schema:
>>> temperatureSchema.simpleString()
Here is the output. We are able to see both the columns just added in the schema. Now you are good to go to the next step.
'struct<day:string,tempInCelsius:string>'
The next step is to create the DataFrame from streaming. Let’s see the code for creating the DataFrame.
>>> temperature_streaming_df = spark
... .readStream
... .option("sep", ",")
... .schema(userSchema)
... .csv("temperature-data")

There are multiple things going on in this code. Let’s examine them one by one.

Initially we are calling the method readStream. This method is essential to creating the streaming. This method returns a DataStreamReader used to read streaming data as a DataFrame.

Next, we are setting the separator to , (comma). This is because we are using a comma-separated value.

To make sure the DataFrame is in streaming mode, we use the isStreaming method shown here.
>>> temperature_streaming_df.isStreaming
Here is the output telling us that the DataFrame is streaming:
True

Then, using the schema method, we are setting the schema for the DataFrame. Finally, we are specifying the directory onto which Spark should bind and look for streaming data.

Let’s try to print the schema and then register the DataFrame as a table or view and then show the contents of the DataFrame. Initially I have kept the directory empty. So you may not see any output from the DataFrame.

Printing the schema
>>> temperature_streaming_df.printSchema()
Here is the output. The output is as expected, with two given columns.
root
 |-- day: string (nullable = true)
 |-- tempInCelsius: string (nullable = true)
Now we register the DataFrame as a table.
>>> temperature_streaming_df.createOrReplaceTempView("temperature_streaming_df")

I am going to apply a simple SQL query to show the contents of the table.

Let’s print the contents of the DataFrame.
>>> spark.sql("select * from temperature_streaming_df").show()
Alas, you are seeing a lot of exceptions with this final message.
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();; FileSource[temperature-data]'

Don’t worry, I wanted you to understand what is going on so that you are able to solve such issues when you are doing it by yourself.

Spark streaming doesn’t start unless you call the start method on the output sink. We will look at setting the sink in the next recipe. In this recipe, we covered Spark Structured streaming, as well as setting up a streaming DataFrame from a directory and registering it as a table.

Recipe 8-2. Initiate a Streaming Query and See It in Action

Problem

You want to persist streaming data into a file.

Solution

As you saw in the previous recipe, unless you start a sink on a streaming DataFrame, the streaming is not really active. Let’s start with a simple streaming scenario and then move ahead step by step. To start with simple streaming, you need to understand some concepts.

Sinks are where you write your output data. There are multiple sink types that Spark supports—File, Kafka, Console, foreach, and Memory. We will look into the File, Kafka, and Console sink types.
  • File sink : The output is stored in a specified directory.

  • Kafka sink : Output is stored in Kafka topics. It could be more than one topic and multiple consumers can be listening to it for various purposes. For example, one topic could be for further processing and another one could be for persisting the data into Hadoop for archiving or for future references.

  • Console sink : Used for debugging purposes to print the data into the console.

Let’s look at the output modes. The output mode defines how to write the data into the sink. Spark supports three kinds of output modes—Append, Complete, and Update.
  • Append: This is the default mode that simply appends the incoming stream of data into the sink.

  • Complete: This mode writes the data completely into the sink from in-memory. This is needed when there are aggregation queries performed on the streaming data. For example, if you need to determine which political leaders have maximum mentions in the news for the last 15 minutes, and so on.

  • Update: Only the updated rows are updated into the sink.

Now that you understand the output modes and sinks concepts, we will start the streaming and see these concepts in action.

To start, we are going to write the streamlining data into the console so you can easily see the output and understand what is going on.

I will be using four files for illustration purposes, as shown in Figures 8-2 through 8-5.
../images/469054_1_En_8_Chapter/469054_1_En_8_Fig2_HTML.jpg
Figure 8-2

tempData-1.csv

../images/469054_1_En_8_Chapter/469054_1_En_8_Fig3_HTML.jpg
Figure 8-3

tempData-2.csv

../images/469054_1_En_8_Chapter/469054_1_En_8_Fig4_HTML.jpg
Figure 8-4

tempData-3.csv

../images/469054_1_En_8_Chapter/469054_1_En_8_Fig5_HTML.jpg
Figure 8-5

tempData-4.csv

How It Works

Be sure to observe the contents of the files carefully.
>>> from pyspark.sql.types import StructType
>>> temperatureSchema = StructType().add("day", "string").add("tempInCelsius", "string")
>>> temperature_streaming_df = spark
... .readStream
... .option("sep", ",")
... .schema(temperatureSchema)
... .csv("C:\book-author\data\data\temperature-data")

With this code, we have created a streaming DataFrame. Now we are going to start it using the writeStream method .

The following code starts the streaming. This is a very simple form of streaming, where we create a stream using the writeStream method, which will simply write into the console and will keep on appending the incoming data.
>>> query = temperature_streaming_df
... .writeStream
... .format("console")
... .outputMode("append")
... .start()

So far you will not see any output. Now we place the tempData.csv file into the temperature-data directory. Notice what happens then.

You can observe the following output in the console.
-------------------------------------------
Batch: 0
-------------------------------------------
+----+-------------+
| day|tempInCelsius|
+----+-------------+
|day1|         12.2|
|day2|         13.1|
|day3|         12.9|
|day4|         11.9|
|day5|           14|
|day6|         13.9|
|day7|         12.7|
+----+-------------+
Now we place the tempData-3.csv file. Here is the output:
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-------------+
|  day|tempInCelsius|
+-----+-------------+
| day8|         12.2|
| day9|         13.1|
|day10|         12.9|
|day11|         11.9|
|day12|           14|
|day13|         13.9|
|day14|         12.7|
+-----+-------------+

Notice from the output that the tempData-3.csv file is printed to the console.

Now for the interesting part. Let’s now place the tempData-2.csv file. You might have noticed that the tempData-2.csv and tempData-1.csv files contain the same data. So ideally they are duplicates. In this scenario, you will not see any output in the console. You might wonder why you are not seeing any output, and this can be easily understood from the next action.

Between tempData-2.csv and tempData-4.csv, I made a small change to day7’s temperature (it’s now 12.7 instead of the original 12.6). With that one change, you are able to see the output in the console.

Spark provides an exactly-once guarantee, which means that each record will be processed exactly once. None of the data will be lost in the process and none of the data will be processed more than once. This is a very strong feature that Spark provides compared to other streaming solutions.
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-------------+
| day|tempInCelsius|
+----+-------------+
| day|tempInCelsius|
|day1|         12.2|
|day2|         13.1|
|day3|         12.9|
|day4|         11.9|
|day5|           14|
|day6|         13.9|
|day7|         12.8|
+----+-------------+

Now we are going to see the streaming DataFrame represented as an in-memory table so that we can directly apply PySparkSQL queries to it.

Here is the code:
>>> query = temperature_streaming_df
... .writeStream
... .format("memory")
... .queryName("TemperatureData")
... .outputMode("append")
... .start()

You can see that we made two changes compared to the previous execution of this statement. First, we changed the format from console to memory. This is to signify that instead of writing the output to the console, we are asking Spark to save the data in memory.

Next, we call a new method called queryName and provide TemperatureData as its input. This will create a table representation over the in-memory data. Here are the contents of the table using PySparkSQL:
>>> spark.sql("select * from TemperatureData").show()
Here is the output. You can see that it is empty. It is because we have cleared all the data in the directory.
+---+-------------+
|day|tempInCelsius|
+---+-------------+
+---+-------------+
Now we add the tempData-1.csv and tempData-3.csv files into the directory. Let’s look at the output after adding both the files.
>>> spark.sql("select * from TemperatureData").show()
Here is the output:
+-----+-------------+
|  day|tempInCelsius|
+-----+-------------+
| day8|         12.2|
| day9|         13.1|
|day10|         12.9|
|day11|         11.9|
|day12|           14|
|day13|         13.9|
|day14|         12.7|
| day1|         12.2|
| day2|         13.1|
| day3|         12.9|
| day4|         11.9|
| day5|           14|
| day6|         13.9|
| day7|         12.7|
+-----+-------------+

Recipe 8-3. Apply PySparkSQL on Streaming

Problem

You want to apply SQL filters on the streaming data.

Solution

We need to apply a filter to the same table.

How It Works

Here is the query where we filter for days with a tempInCelsius greater than 13. Since the tempInCelsius column is of string type, we cast it to double.
>>> spark.sql("select * from TemperatureData where cast(tempInCelsius as double) > 13").show()
Here is the output:
+-----+-------------+
|  day|tempInCelsius|
+-----+-------------+
| day9|         13.1|
|day12|           14|
|day13|         13.9|
| day2|         13.1|
| day5|           14|
| day6|         13.9|
+-----+-------------+

So with Spark’s structured streaming, we can consider it a table and Spark provides us with data-wrangling capabilities with the ease of SQLs. Similarly, we can perform any SQL operations on this table. You may want to join this data with static data to be able to map the incoming data to apply analytics and get trend reports on specific aspects.

Recipe 8-4. Join Streaming Data with Static Data

Problem

You need to join streaming data with static data.

Solution

In this recipe, you need to join streaming data with static data to get more information from the streaming data. I have modified the data a little bit to be able to complete this recipe.

Figure 8-6 shows a snapshot of the static data used in this recipe.
../images/469054_1_En_8_Chapter/469054_1_En_8_Fig6_HTML.jpg
Figure 8-6

Static data

The data in Figure 8-6 is static data that represents ZipCode and City. For illustration purposes, I use eight cities.

The streaming data is going to be temperature data that will come in every minute for each of the cities. Take a look at the temperature data snapshot in Figures 8-7 through 8-9.
../images/469054_1_En_8_Chapter/469054_1_En_8_Fig7_HTML.jpg
Figure 8-7

The 4 zipcode-temp-1.csv file

../images/469054_1_En_8_Chapter/469054_1_En_8_Fig8_HTML.jpg
Figure 8-8

The 4 zipcode-temp-2.csv file

../images/469054_1_En_8_Chapter/469054_1_En_8_Fig9_HTML.jpg
Figure 8-9

4 zipcode-temp-3.csv file

As you can see in these figures, these are the three files I am going to stream. Each file contains three columns:
  • time—Time at which the reading was taken

  • zipCode—ZIP code for which the reading was taken

  • tempInCelsius—Temperature reading in degrees Celsius

How It Works

Step 8-4-1. Creating a Static Table Over the Zip Code City Data File

The following code creates that table.
>>> city_zipcode_df = spark.read.csv('city-zipcode.csv',header=True, inferSchema=True)
>>> city_zipcode_df.createOrReplaceTempView("CityZipcode")
>>> spark.sql("select * from CityZipcode").show()
Here is the output:
+-------+------------+
|Zipcode|        City|
+-------+------------+
|   8817|      EDISON|
|  10801|NEW ROCHELLE|
|   8053|     MARLTON|
|  15554|   NEW PARIS|
|  45874|   OHIO CITY|
|  45347|   NEW PARIS|
|  59547|      ZURICH|
|  66101| KANSAS CITY|
+-------+------------+

Step 8-4-2. Creating a Streaming DataFrame for the Temperate Data for ZIP Code Files

In the following code, we are creating the schema and then using it to create a streaming DataFrame.
>>> temperatureSchema = StructType().add("time", "timestamp").add("ZipCode", "string").add("tempInCelsius", "double")
>>> temperature_streaming_df = spark
... .readStream
... .option("sep", ",")
... .schema(temperatureSchema)
... .csv("zipcode-temp")

Step 8-4-3. Join Static Data with Streaming Data

Now we are ready to apply the join. Let’s apply the join using PySparkSQL.
>>> tempDF = spark.sql("select City, T.zipCode,tempInCelsius from temperature_streaming T JOIN CityZipcode C on (C.ZipCode = T.Zipcode)")
Here, we are joining both tables we created:
>>> query = (
...   tempDF
...     .writeStream
...     .format("console")
...     .outputMode("append")
...     .start()
... )
Now we place the files one by one. Upon placing the zipcode-temp-1.csv file, here is the output at the console:
>>> ---------------------------------------
Batch: 0
-------------------------------------------
+------------+-------+-------------+
|        City|zipCode|tempInCelsius|
+------------+-------+-------------+
|      EDISON|   8817|         12.1|
|NEW ROCHELLE|  10801|         13.9|
|     MARLTON|   8053|         11.7|
|   NEW PARIS|  15554|         13.0|
|   OHIO CITY|  45874|         13.3|
|   NEW PARIS|  45347|         12.4|
|      ZURICH|  59547|         10.5|
| KANSAS CITY|  66101|         10.6|
+------------+-------+-------------+
Upon placing the zipcode-temp-2.csv file, here is the output at the console:
-------------------------------------------
Batch: 1
-------------------------------------------
+------------+-------+-------------+
|        City|zipCode|tempInCelsius|
+------------+-------+-------------+
|      EDISON|   8817|         12.1|
|NEW ROCHELLE|  10801|         13.9|
|     MARLTON|   8053|         11.7|
|   NEW PARIS|  15554|         13.0|
|   OHIO CITY|  45874|         13.3|
|   NEW PARIS|  45347|         12.4|
|      ZURICH|  59547|         10.5|
| KANSAS CITY|  66101|         10.6|
+------------+-------+-------------+
Finally, upon placing the zipcode-temp-2.csv file, here is the output at the console:
-------------------------------------------
Batch: 2
-------------------------------------------
+------------+-------+-------------+
|        City|zipCode|tempInCelsius|
+------------+-------+-------------+
|      EDISON|   8817|         14.5|
|NEW ROCHELLE|  10801|         12.9|
|     MARLTON|   8053|         11.6|
|   NEW PARIS|  15554|         10.0|
|   OHIO CITY|  45874|         13.6|
|   NEW PARIS|  45347|         11.3|
|      ZURICH|  59547|         10.2|
| KANSAS CITY|  66101|         11.6|
+------------+-------+-------------+

You can observe that the join between temperature_streaming and CityZipcode is realized at the console output automatically. This is the power of Spark streaming. Instead of printing the output to the console, you can easily persist it into a file or keep it in memory and further apply analytics. With this chapter, you are able to successfully use Spark structured streaming to use process streaming data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset