Chapter 7. Structured Streaming

Evolution of Apache Spark Stream Processing Engine

Stream processing is defined as a continuous processing of endless streams of data. With the advent of big data, stream processing systems transitioned from single node processing engines to multiple node, distributed processing engines. Traditionally, distributed stream processing has been implemented with a record-at-a-time processing model which looks like this.

Figure 7-1. Traditional, record-at-a-time processing model

The processing pipeline is composed of a directed graph of nodes as shown in Figure 9-1 with nodes 1-3. Each node continuously receives one record at a time, processes it, and then forwards the generated record(s) to the next node in the graph. This processing model can achieve very low latencies, that is, an input record can be processed by the pipeline and the resulting output can be generated within milliseconds. However, this model is not very efficient at recovering from node failures and straggler nodes (i.e., nodes slower than others) - it can either recover from a failure very fast with a lot of extra failover resources, or it can use minimal extra resources but recover slowly1.

The Advent of Micro-batch Stream Processing

This traditional approach was challenged by Apache Spark when it introduced Spark Streaming (also called DStreams). It introduced the idea of micro-batch stream processing where the streaming computation is modeled as a continuous series of small, map-reduce-style batch processing jobs (hence, “micro-batches”) on small chunks of the stream data. This is illustrated below.

Figure 7-2. Structured Streaming uses micro-batch processing model

As shown in Figure 9-2, Spark Streaming divides the data from the input stream into, say, 1 second micro-batches. Each batch is processed in the Spark cluster in a distributed manner with small deterministic tasks which generate the output in micro-batches. Breaking down the streaming computation into such small deterministic tasks gives us two advantages over the traditional, continuous-operator model:

Spark’s agile task scheduling can very quickly and efficiently recover from failures and straggler executors by rescheduling one or more copies of the tasks on any of the other executors.

The deterministic nature of the tasks ensures that the output data is the same no matter how many times the task is re-executed. This crucial characteristic enabled Spark Streaming to provide end-to-end exactly-once processing guarantees.

This efficient fault-tolerance does come at the cost of latency -- the micro-batch model cannot achieve millisecond-level latencies; it usually achieves latencies of few seconds (as low as half-a-second in some cases). However, we have observed that for an overwhelming majority of stream processing use cases, the benefits of micro-batch processing outweigh the drawback of second-scale latencies. This is because most streaming pipelines have at least one of the following characteristics:

The pipeline does not need latencies lower than a few seconds. For example, when the streaming output is only going to be read by hourly jobs, it is not useful to generate the output with sub-second latencies.

There are larger delays in other parts of the pipeline. For example, if the writes by a sensor into Kafka are batched for achieving higher throughput, then no amount of optimization in the downstream processing systems can make the end-to-end latency lower than the batching delays.

Furthermore, the DStream APIs of Spark Streaming was built upon the Spark’s batch RDD APIs. Therefore, DStreams had the same functional semantics and fault-tolerance model as those of RDDs. Together, Spark Streaming proved that it is possible for a single, unified processing engine to provide consistent APIs and semantics for batch, interactive, and streaming workloads. This fundamental paradigm shift in stream processing propelled Spark Streaming as the one of the most widely used open-source stream processing engine.

Lessons Learnt from Spark Streaming (DStreams)

Despite all the advantages, the DStream API was not without its flaws as we discovered while working with developers in the Spark community. Here are a few key issues where the DStream APIs could be improved upon.

Lack of a single API for batch and streaming - Even though DStreams and RDDs have consistent APIs (i.e., same operations and same semantics), developers still had to explicitly rewrite their code to use different classes when converting their batch jobs to streaming jobs.

Lack of separation between logical and physical plans - Spark Streaming executes the DStream operations in the same sequence as that they were specified by the developer. Since developers effectively specify the exact physical plan, there is no scope for automatic optimizations and developers have to hand optimize the code to get the best performance.

Lack of native support for event-time windows - DStreams define window operations based only on the time when each record is received by Spark Streaming (known as processing time). However, many use cases need to calculate windowed aggregates based on the time when the records were generated (known as event time) instead of when they were received or processed. The lack of native support of event-time windows made it hard for developers to build such pipelines with Spark Streaming.

These drawbacks shaped the design philosophy in Structured Streaming, which we will discuss next.

The Philosophy of Structured Streaming

Based on these lessons from DStreams, Structured Streaming was designed from scratch with one core philosophy -- for developers, writing stream processing pipelines should be as easy as writing batch pipelines. In a nutshell, below are the guiding principles of Structured Streaming.

A single, unified programming model and interface for batch and streaming - This unified model offers a simple API interface for both batch and streaming workloads. You can use familiar SQL or batch-like DataFrame queries (learned so far in the previous chapters) on your stream as you would on a batch, leaving the underlying complexities of fault-tolerance, optimizations, and tardy data to the engine. In the coming sections, we will examine some of these queries you would write.

A broader definition of stream processing - Big-data processing applications have grown complex enough that the line between real-time processing and batch processing has blurred significantly. For Structured Streaming, we wanted to broaden its applicability from traditional stream processing to a larger class of applications - any application that periodically (e.g., every few hours) to continuously (like traditional streaming applications) processes data should be expressible using Structured Streaming.

Next, we are going to discuss the programming model used by Structured Streaming.

The Programming Model of Structured Streaming

“Table” is a well-known concept that developers are familiar with when building batch applications. For Structured Streaming, we have chosen to extend it to streaming applications by treating a stream as an unbounded, continuously-appended table. This is illustrated in the picture below.

Figure 7-3. Programming model of Structured Streaming: data stream as an unbounded table

Every new record received in the data stream, as shown in Figure 9-3 is like a new row being appended to the unbounded Input Table. Structured Streaming will not actually retain all the input, but the result produced by Structured Streaming till time T will be equivalent to having all of the data until T in a static, bounded table, and running a batch job on the table.

As shown in Figure 9-4, the developer then defines a query on this conceptual Input Table, as if it were a static table, to compute a final Result Table that will be written to an output sink. Structured Streaming will automatically convert this batch-like query to a streaming execution plan. This is called incrementalization: Structured Streaming figures out what state needs to be maintained to update the result each time a record arrives. Finally, developers specify triggering policies to control when to update the results. Each time a trigger fires, Structured Streaming checks for new data (i.e., new row in the input table), and incrementally updates the result.

Figure 7-4. Structured Streaming Processing Model

The last part of the model is output mode. Each time the result table is updated, the developer will want to write the updates to an external system, such as a file system (e.g., HDFS, Amazon S3), or a database (e.g., MySQL, Cassandra). We usually want to write output incrementally. For this purpose, Structured Streaming provides three output modes:

Append mode: Only the new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only on queries where existing rows in the result table cannot change (e.g., a map on an input stream).

Update mode: Only the rows that were updated in the result table since the last trigger will be changed in the external storage. This mode works for output sinks that can be updated in place, such as a MySQL table.

Complete mode: The entire updated result table will be written to external storage.

Note: Unless Complete mode is specified, the Result table will not be fully materialized by Structured Streaming. Just enough partial information (known as “state”) will be maintained by Structured Streaming such that the changes in Result Table can be computed and the updates can be outputted.

Thinking of the data streams as tables not only makes it easier to conceptualize the logical computations on the data, but also makes it easier to express them in code. Since Spark’s DataFrame is a programmatic representation of tables, you can use the same DataFrame APIs to express your computations on streaming data. All you need to do is define an input DataFrame (i.e., the Input Table) from a streaming data source, and then you apply operations on the DataFrame in the same way as you would do on a DataFrame defined on a batch source.

In the next section, we will see how easy it is to write Structured Streaming queries using DataFrames.

The Fundamentals of a Structured Streaming Query

In this section, we are going to walk through high-level concepts that you need to understand to develop Structured Streaming queries. We will first walk through the key steps to define and start a streaming query. Then we will discuss how to monitor the active query and manage its life cycle.

Five Steps to Define a Streaming Query

As we discussed in the previous section, Structured Streaming uses the same DataFrame APIs as batch queries to express the data processing logic. However, there are a few key differences you need to know for defining a Structured Streaming query. In this section, we will explore the steps to define a streaming query by building a simple query that reads streams of text data over a socket and counts the words.

Step 1: Define input sources

As with batch queries, the first step is to define a DataFrame from a streaming source. However, while reading batch data sources need spark.read to create a DataFrameReader, streaming sources need spark.readStream to create a DataStreamReader. DataStreamReader has most of the same methods as DataFrameReader, therefore you can use it in a very similar way. Here is an example of creating a DataFrame from a text data stream to be received over a socket connection.

# In Python
          spark = SparkSession…
           lines = ( spark
           .readStream.format("socket")
           .option("host", "localhost")
           .option("port", 9999)
           .load() )
          // In Scala 
          val spark = SparkSession...
          val lines = spark
          .readStream.format("socket")
          .option("host", "localhost")
           .option("port", 9999)
           .load())

This code generates the lines DataFrame as an “unbounded table” of newline separated text data read from “localhost:9999”. Note that, similar to batch sources with spark.read, this does not immediately start reading the streaming data; it only sets up the configurations necessary for reading the data once the streaming query is explicitly started.

Besides sockets, Apache Spark natively supports reading data streams from Apache Kafka and all the various file-based formats that DataFrameReader supports (e.g., Parquet, ORC, JSON, etc.). The details of these sources and their supported options are discussed later in this chapter. Furthermore, a streaming query can define multiple input sources, both streaming and batch sources, which can be combined together using DataFrame operations like unions and joins (also discussed later in this chapter).

Step 2: Transform data

Now we can apply the usual DataFrame operations such as splitting the lines into individual words and then count them as shown in the code below:

# In Python
           words = lines.select(split(col("value"), "s").alias("word"))
          counts = words.groupBy("word").count()
          // In Scala
          val words = lines.select(split(col("value"), "s").as("word"))
          val counts = words.groupBy("word").count()

counts is a streaming DataFrame (that is, a DataFrame on unbounded, streaming data) that represents the running word counts that will be computed once the streaming query is started and the streaming input data is continuously processed.

Note that the above DataFrame operations to transform the lines streaming DataFrame would work in the exact same way if lines were a batch DataFrame. In general, most DataFrame operations that can be applied on a batch DataFrame can also be applied on a streaming DataFrame. To understand which operations are supported in Structured Streaming, you have to recognize the two broad classes of data transformations.

Stateless transformations: Operations like select, filter, map, etc. do not require any information from previous rows to process the next row; each row can be processed by itself. The lack of previous “state” in these operation make them stateless. Stateless operations can be applied to both batch and streaming DataFrames. For instance, the flatMap operation in our code snippet is a stateless operation.

Stateful transformations: In contrast, an aggregation operation like count in the above snippet requires maintaining state to combine data across multiple rows. More specifically, any DataFrame operation involving grouping, joining or aggregations are stateful transformations. While many of these operations are supported in Structured Streaming, a few combinations of them are not supported because it is either computationally hard, or infeasible to compute them in an incremental manner.

The stateful operations supported by Structured Streaming and how to manage their state at runtime are discussed later in the chapter.

Step 3: Define output sink and output mode

After transforming the data, we can define how to write the processed output data with DataFrame.writeStream (instead of DataFrame.write for batch data). This creates a DataStreamWriter which, similar to DataFrameWriter, has additional methods to specify the following

Output writing details - where and how to write the output.

Processing details - how to process and how to recover from failures.

Let us first focus on the writing and processing details in the next step. We will focus on the processing details in the next step. For example, to write the final counts to the console, the following snippet shows how:

# In Python
          writer = ( counts.writeStream
          .format("console")
          .outputMode("complete") )
          // In Scala
           val writer = counts.writeStream
          .format("console")
          .outputMode("complete")

Here we have specified “console” as the output streaming sink and “complete” as the output mode. The output mode of a streaming query specifies what part of the updated output to write out after processing new input data. In this example, as a chunk of new input data is processed and the word counts are updated, we can choose to print to the console either the counts of all the words seen till now (that is, complete mode in the above snippet), or only those words that were updates in the last chunk of input data. This is decided by the specified output mode which can have one of the following values (as we already saw in the programming model (see section “The Programming Model of Structured Streaming”):

Append mode: This is the default mode, where only the new rows added to the result table/DataFrame (for example, the counts table) since the last trigger will be outputted to the sink. Semantically, this mode guarantees that any row that is outputted is never going to be changed or updated by the query in the future. Hence, Append mode is supported by only those queries (e.g., stateless queries) that will never modify previously output data. In contrast, our word count query can update previously generated counts, therefore it does not support Append mode.

Complete mode - In this mode, all the rows of the result table/DataFrame will be outputted at the end of every trigger. This is supported by queries where the result table is likely to be much smaller than the input data and is therefore feasible to be retained in memory. For example, our word count supports complete mode because the counts data is likely to be far smaller than that of the input data.

Update mode - In this mode, only the rows of result that were updated since the last trigger will be outputted at the end of every trigger. This is in contrast to the Append mode as the outputted rows may again be modified by the query and outputted again by the future. Most queries support output mode.

More recent details of output modes supported by different queries can be found in the latest Structured Streaming Programming Guide2.

Besides writing the output to the console, Structured Streaming natively supports streaming writes to files and Apache Kafka. In addition, you can write to arbitrary locations using the foreachBatch and foreach API methods. In fact, you can use foreachBatch to write streaming outputs using existing batch data sources. The details of these sinks and their supported options are discussed later in this chapter.

Step 4: Specify processing de tails

The final step before starting the query is to specify details of how to process the data. Continuing with our word count example, we are going to specify the processing details as follows.

# In Python
           checkpoint_dir = "..."
           writer2 = ( writer
           .trigger(Trigger.ProcessingTime("1 second"))
           .option("checkpointLocation", checkpoint_dir))
          // In Scala
           val checkpointDir = "..."
          val writer2 = writer
           .trigger(Trigger.ProcessingTime("1 second"))
           .option("checkpointLocation", checkpointDir)

Here we have specified two details using the DataStreamWriter that we had created with DataFrame.writeStream.

Triggering details - When to trigger the discovery and processing of newly available streaming data. There are two options.

ProcessingTime with or without a trigger interval - This is the default triggering mode which processes streaming data in micro-batches. By default, when no trigger is specified, a query triggers the next micro-batch as soon as the previous micro-batch has completed. Alternatively, you can explicitly specify the ProcessTime trigger with an interval, and the query will trigger micro-batches at that fixed internal.

Once - In this mode, the streaming query will execute exactly one micro-batch - it processes all the new data available in a single batch and then stops itself. This is useful when you want to control the triggering and processing from an external scheduler that will restart the query using any custom schedule (e.g., control cost by only executing a query once per day3).

Continuous - This is an experimental mode (as of Spark 3.0) where the streaming query will process data continuously instead of micro-batches. While only a small subset of DataFrame operations allow this mode to be used, it can provide much lower latency (as low as milliseconds) than the micro-batch trigger modes. Refer to the latest Structured Streaming Programming Guide for the most up to date information.

Checkpoint location - This is a directory in any HDFS-compatible file system where a streaming query saves its progress information, that is, what data has been successfully processed. Upon failure, this metadata is used to restart the failed query exactly where it left off. Therefore, setting this option is necessary for failure recovery with exactly-once guarantees.

Step 5: Starting the query

Once everything has been specified, the final step is to start the query, which you can do with the following.

# In Python
          streaming_query = writer2.start()
          // In Scala
          val streamingQuery = writer2.start()

The returned instance of type StreamingQuery represents an active query and can be used to manage the query, which we will cover later in this chapter.

Note that start() is a non-blocking method, so it will return as soon as the query has been started in the background. If you want the main thread to block until the streaming query has terminated, you can use streamingQuery.awaitTermination(). If the query fails in the background with an error, awaitTermination() will also fail with that same exception.

Furthermore, you can wait up to a time out duration using awaitTermination(timeoutMillis). Finally, you can explicitly stop the query with streamingQuery.stop().

Putting it all together

To summarize, here is the complete code for reading streams of text data over a socket, counting the words and printing the counts to the console.

# In Python
           spark = SparkSession...
          lines = ( spark
          .readStream.format("socket")
          .option("host", "localhost")
           .option("port", 9999)
           .load() )
          words = lines.select(split(col("value"), "s").alias("word"))
          counts = words.groupBy("word").count()
           checkpoint_dir = "..."
          streaming_query = ( 
          counts.writeStream
          .format("console")
          .outputMode("complete")
           .trigger(processingTime = "1 second"))
           .option("checkpointLocation", checkpoint_dir)
          .start() )
          streaming_query.awaitTermination()
          // In Scala
          val spark = SparkSession...
          val lines = spark
          .readStream.format("socket")
          .option("host", "localhost")
           .option("port", 9999)
           .load()
          val words = lines.select(split(col("value"), "s").as("word"))
          val counts = words.groupBy("word").count()
           val checkpointDir = "..."
          val streamingQuery = counts.writeStream
          .format("console")
          .outputMode("complete")
           .trigger(Trigger.ProcessingTime("1 second"))
           .option("checkpointLocation", checkpointDir)
          .start()
          streamingQuery.awaitTermination()

After the query has started, a background thread continuously reads new data from the streaming source, processes it, and writes it to the streaming sinks. Next, let’s take a quick peek under the hood at how this is executed.

Under the Hood of an Active Streaming Query

Once the query starts(), the following sequence of steps transpires in the engine, as depicted in Figure 9-5. The DataFrame operations are converted into a logical plan, which is an abstract representation of the computation that Spark SQL uses to plan a query.

Spark SQL analyzes and optimizes this logical plan to ensure that it can be executed incrementally and efficiently on streaming data.

Spark SQL starts a background thread that continuously executes the following loop4

Based on the configured trigger interval, the thread checks the streaming sources for the availability of new data.

If available, the new data is executed by running a micro-batch. From the optimized logical plan, an optimized Spark execution plan is generated that reads the new data from the sink, incrementally computes the new result, and writes out updated results to the sink according to the configured output mode.

For every micro-batch, the exact range of data processed (e.g., the set of files, or the range of Apache Kafka offsets) and any associated state are saved in the configured checkpoint location so that the query can deterministically reprocess the exact range if needed.

This loop continues until the query is terminated, which can occur by one of the following ways:

Any failure in the query, either processing errors, or failures in the cluster.

Explicitly stopping it using streamingQuery.stop().

If the trigger is set to Once, then the query will stop on its own after a single micro-batch on all the available data.

Figure 7-5. Incremental execution of streaming queries

Note: A key point you should remember about Structured Streaming is that it is using Spark SQL underneath to execute the data. As such, the full power of Spark SQL’s hyper-optimized execution engine is utilized to maximize the stream processing throughput, providing key performance advantages.

Next, we will discuss how to restart the streaming query after termination and the life cycle of a streaming query.

Recovering from Failures with Exactly-once Guarantees

To restart a terminated query in a completely new process, you have to create a new SparkSession, redefine all the DataFrames, and start the streaming query on the final result using the same checkpoint location as the one used when then query was started the first time. For our word count example, you can simply re-execute the entire code snippet shown earlier - from the definition of spark in the first line to the final start() in the last line.

The checkpoint location must be the same across restarts because this directory contains the unique identity of a streaming query and determines the lifecycle of the streaming query. If the checkpoint directory is deleted or the same query is started with a different checkpoint directory, then it is like starting a new query from scratch. Specifically, checkpoints have record-level information (e.g., Apache Kafka offsets) to track the data range the last incomplete micro-batch was processing. The restarted query is going to use this information to reprocess the same range of data. Coupled with Spark’s deterministic task execution, the regenerated output will be the same as it was expected to be before restart.

Structured Streaming can ensure end-to-end exactly-once guarantee (that is, the output is as if each input record was processed exactly once) when the following conditions have been satisfied:

Replayable streaming sources - The data range of the last incomplete micro-batch can be re-read from the source.

Deterministic computations - All data transformations deterministically produce the same result when given the same input data.

Idempotent streaming sink - The sink must be able to identify re-executed micro-batches and ignore duplicate writes that may be caused by restarts.

Note that our word count example, does not ensure exactly-once guarantees as the socket source is not replayable, and the console sink is not idempotent.

As a final note regarding restarting queries, it is possible to make minor modifications to a query between restarts. Here are a few ways you can modify the query.

DataFrame transformations - You can make minor modifications to the transformations between restarts. For example, in our streaming word count, if you want to ignore lines that have corrupted byte sequences that can crash the query, you can add a filter in the transformation

# In Python
        # is_corrupted_udf = udf to detect corruption in string
        filtered_lines = lines.filter("is_corrupted_udf(value) = false")
        words = filtered_lines.select(split(col("value"), "s").as("word"))
        // In Scala
        // is_corrupted_udf = udf to detect corruption in string
        val filtered_lines = lines.filter("is_corrupted_udf(value) = false")
        val words = filtered_lines.select(split(col("value"), "s").as("word"))

Upon restart with this modified word DataFrame, the restarted query will apply the filter on all data processed since the restart (including the last incomplete micro-batch) preventing the query from failing again.

Source and Sink options - Whether a readStream or writeStream option can be changed between restarts depends on the semantics of the specific source or sink. For example, you should not change the host and port options for the socket source if data is going to be sent at that host and port. But you can add an option to the console sink to print up to 100 changed counts after every trigger:

writeStream.format("console").option("numRows", "100")...

Processing details - As discussed earlier, the checkpoint location must not be changed between restarts. However, other details like trigger interval can be changed without breaking fault-tolerance guarantees.

The narrow set of changes that are allowed between restarts are discussed in the latest Structured Streaming Programming Guide5.

Monitoring an Active Query

An important part of running a streaming pipeline in production is to track its health. Structured Streaming provides several ways to track the status and processing metrics of an active query.

Querying current status using StreamingQuery

You can query the current health of an active query using the StreamingQuery instance. Here are two methods.

Get current metrics using StreamingQuery.lastProgress() and StreamingQuery.recentProgress() - When a query processes some data in a micro-batch, we consider it to have made some progress. lastProgress() returns information of the last completed micro-batch. For example, printing the returned object (StreamingQueryProgress in Scala/Java or dictionary in Python) duce something like this (in Scala/Java).

{
           "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
           "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
           "name" : "MyQuery",
           "timestamp" : "2016-12-14T18:45:24.873Z",
           "numInputRows" : 10,
           "inputRowsPerSecond" : 120.0,
           "processedRowsPerSecond" : 200.0,
           "durationMs" : {
           "triggerExecution" : 3,
           "getOffset" : 2
           },
           "stateOperators" : [ ],
           "sources" : [ {
           "description" : "KafkaSource[Subscribe[topic-0]]",
           "startOffset" : {
           "topic-0" : {
           "2" : 0,
           "1" : 1,
           "0" : 1
           }
           },
           "endOffset" : {
           "topic-0" : {
           "2" : 0,
           "1" : 134,
           "0" : 534
           }
           },
           "numInputRows" : 10,
           "inputRowsPerSecond" : 120.0,
           "processedRowsPerSecond" : 200.0
           } ],
           "sink" : {
           "description" : "MemorySink"
           }
          }

Some of the noteworthy columns are the following

id - Unique identifier tied to a checkpoint location. It stays the same throughout the lifetime of a query (i.e., same across restarts).

runId - Unique identifier for the current (re)started instance of the query. It changes with every restart.

numInputRows - Number of input rows that were processed in the last micro-batch.

inputRowsPerSecond - Current rate at which input rows are being generated at the source (average over the last micro-batch duration).

processedRowsPerSecond - Current rate at which rows are being processed and written out by the sink (average over the last micro-batch duration). If this rate is consistently lower than the input rate, then the query is unable to process data as fast as it is being generated by the source. This is a key indicator of the health of the query.

sources and sink - Provides source/sink-specific details of the data processed in the last batch.

Get current status using StreamingQuery.status() - This provides information on what the background query thread is doing at this moment. For example, printing the returned object will produce something like this (in Scala/Java).

{
           "message" : "Waiting for data to arrive",
           "isDataAvailable" : false,
           "isTriggerActive" : false
          }

Publishing metrics using DropWizard Metrics

Spark supports reporting metrics via a popular library called Dropwizard Metrics6. This library allows metrics to be published to many popular monitoring frameworks like Ganglia, Graphite, etc. These metrics are by default not enabled for Structured Streaming queries due to their high volume of reported data. To enable them, apart from configuring the Dropwizard Metrics for Spark (see the Spark Monitoring and Instrumentation Guide7 for more details), you also have to explicitly set the SparkSession configuration spark.sql.streaming.metricsEnabled to true before starting the query.

Note that only a subset of the information available through StreamingQuery.lastProgress() is published through DropWizard Metrics. If you want to continuously publish more progress information to arbitrary locations, you have to write custom listeners as discussed next.

Publishing metrics using custom StreamingQueryListeners

StreamingQueryListener is an event listener interface with which you can inject arbitrary logic to continuously publish metrics. This developer API is available only in Scala/Java. There are two steps to using custom listeners.

Define your custom listener - The StreamingQueryListener interface provides three methods that can be defined by your implementation to get three types of events related to a streaming query - start, progress (i.e., a trigger was executed), and termination. Here is an example:

// In Scala
          val myListener = new StreamingQueryListener() {
           override def onQueryStarted(event: QueryStartedEvent): Unit = {
           println("Query started: " + event.id)
           }
           override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
           println("Query terminated: " + event.id)
           }
           override def onQueryProgress(event: QueryProgressEvent): Unit = {
           println("Query made progress: " + event.progress)
           }
          }
          Add your listener to the SparkSession before starting query
          // In Scala
          spark.streams.addListener(myListener)

After adding the listener, all events of streaming queries running on this SparkSession will start calling the listener’s methods.

Streaming Data Sources and Sinks

Now that we have covered the basic steps you need to express an end-to-end Structured Streaming query, let’s examine how to use the built-in streaming data sources and sinks. As a reminder, you can create DataFrames from streaming sources using SparkSession.readStream, and write the output from a result DataFrame using DataFrame.writeStream. In each case, you can specify the source type as using the method format. We will see a few concrete examples later.

Files

Structured Streaming supports reading and writing data streams to files in the same formats as the ones supported in batch - plain text, CSV, JSON, Parquet, ORC. Here we will discuss how to operate Structured Streaming on files.

Reading from files

Structured Streaming can treat files written into a directory as a data stream. Here is an example.

# In Python

from pyspark.sql.types import *

input_directory_of_json_files = …

file_schema = (StructType()

.add(StructField(“key”, StringType()))

.add(StructField(“value”, StringType())))

input_df = (

spark.readStream

.format(“json”)

.schema(file_schema)

.load(input_directory_of_json_files) )

// In Scala

import org.apache.spark.sql.types._

val inputDirectoryOfJsonFiles = …

val fileSchema = new StructType()

.add(“key”, StringType)

.add(“value”, StringType)

val inputDF = spark.readStream

.format(“json”)

.schema(fileSchema)

.load(inputDirectoryOfJsonFiles)

The returned streaming DataFrame will have the specified schema. Here are a few key points to remember when using files:

All the files must be of the same format and are expected to have the same schema. For example, if the format is json, all the files must be in the JSON format with one JSON record per line. The schema of each the JSON record must be the same as the one specified with readStream. Violation of these assumptions can lead to incorrect parsing (e.g., unexpected null values) or query failures.

Each file must appear in the directory listing atomically, that is, the whole file must be available at once for reading, and once it is available, the file cannot be updated or modified. This is because Structured Streaming will process the file when the engine finds it (using directory listing) and internally mark it as processed. Any changes to that file will not be processed.

When there are multiple new files to process but it can only pick some of them in the next micro-batch (e.g., rate limits), then it will select the files with the earliest file timestamp. Within the micro-batch, however, there is no predefined order of reading the selected files as all of them will be read in parallel.

Note: This streaming file source supports a number of options - (i) file-format-specific options, same as those by supported in batch with spark.read (see Chapter 4, section “Data Sources for DataFrames and SQL Tables”), and (ii) common options supported only with streaming (e.g., maxFilesPerTrigger to limit the files processed rate). See the programming guide8 for full details.

Writing to Files

Structured Streaming supports writing streaming query output to files in the same formats as reads. However, it only supports append mode because it is easy to write new files in the output directory (i.e., append data to a directory) but it is hard to modify existing data file (as would be expected of update and complete modes). Similar to batch, it supports partitioning. Here is an example.

# In Python
          output_directory = …
          streaming_query = ( resultDataFrame.writeStream
          .format("parquet")
          .option("path", output_directory)
          .partitionBy("date")
          .start() )
          // In Scala
          val outputDirectory = …
          val streamingQuery = resultDataFrame.writeStream
          .format("parquet")
          .option("path", output_directory)
          .partitionBy("date")
          .start()

Instead of using the path option, you can also specify the output directory directly as start(output_directory).

A few key points to remember:

Structured Streaming achieves end-to-end exactly-once guarantee when writing to files by maintaining a log of the data files that have been written to the directory. This log is maintained in the subdirectory _spark_metadata. Any Spark query on the directory (not its subdirectories) will automatically use the log to read the correct set of data files such that the exactly-once guarantee is maintained (i.e., eliminates duplicate data, or partial files). Note that other processing engines may not be aware of this log and hence may not provide the same guarantee.

If you change the schema of result DataFrame between restarts, then the output directory will have data in multiple schema. These schemas have to be reconciled by the developer when querying the directory.

Apache Kafka

Apache Kafka 9 is a popular publish-subscribe system that is widely used for storage of data streams. Structured Streaming has built-in support for reading and writing from Apache Kafka.

Reading from Kafka

To perform distributed reads from Kafka, you have to use options to specify information on how to connect to Kafka. Say, you want to subscribe to data from a topic “events”. Here is how you can create a streaming DataFrame.

# In Python
          input_df = (spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", 
           "host1:port1,host2:port2")
           .option("subscribe", "events")
           .load())
          // In Scala
          val inputDF = spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
           .option("subscribe", "events")
           .load()

The returned DataFrame will have the following schema.

Column Name Column Type Description
key binary Key data of the record as bytes.
value binary Value data of the record as bytes.
topic string Kafka topic the record was in. This is useful when subscribed to multiple topics.
partition int Kafka topic’s partition the record was in.
offset long Offset value of the record.
timestamp long Timestamp associated with the record.
timestampType int Enumeration for the type of the timestamp associated with the record. See Kafka documentation.

You can also choose to subscribe to multiple topics, a pattern of topics or even a specific partition of a topic. Furthermore, you can choose to read only new data in the subscribed topic, or process all the available data in that topic. Lastly, you can even read Kafka data from batch queries, that is, treat Kafka topics like tables. See the online Kafka Integration Guide for more details10.

Writing to Kafka

For writing to Kafka, Structured Streaming expects the result DataFrame to have a few columns of specific names and types.

Column Name Column Type Description
key (optional) string or binary If present, the bytes will be written as the Kafka key, otherwise the key will be empty.
value (required) string or binary
topic (required only if topic not specified as option) string If “topic” is not specified as an option, this determines the topic to write the key-value to. This is useful for fanning out the writes to multiple topics. If “topic” option has been specified, this value is ignored.

You can write to Kafka in all three modes, though Complete mode is not recommended as it will repeatedly output the same records. Here is a concrete example of writing the output of our earlier word count query into Kafka in update mode.

# In Python
           counts = … # DataFrame[word: string, count: long]
           streaming_query = (
          counts.selectExpr(
          "cast(word as string) as key", 
          "cast(count as string) as value")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
           .option("topic", "word_counts")
           .outputMode("update")
          .option("checkpointLocation", checkpoint_dir)
          .start() )
          // In Scala
          val counts = … // DataFrame[word: string, count: long]
           val streamingQuery = counts.selectExpr(
          "cast(word as string) as key", 
          "cast(count as string) as value")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
           .option("topic", "word_counts")
           .outputMode("update")
           .option("checkpointLocation", checkpointDir)
          .start()

See the online Kafka Integration Guide for more details11.

Custom Streaming Sources and Sinks

In this section, we will discuss how to read and write to storage systems that do not have built-in support in Structured Streaming. In particular, how to use foreachBatch and foreach methods to implement custom logic to write to your storage.

Writing to any storage systems

There are two operations that allow you to write the output of a streaming query to arbitrary storage systems - foreachBatch and foreach. They have slightly different use cases: while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch. Let’s understand their usage in more detail.

Using foreachBatch

foreachBatch(...) allows you to specify a function that is executed on the output of every micro-batch of a streaming query. It takes two parameters: a DataFrame or Dataset that has the output of a micro-batch and the unique identifier of the micro-batch. Here is an example. Say we want to write the output of our earlier word-count query to Apache Cassandra12. As of Spark Cassandra Connector13 2.4.2, there is no support for writing streaming DataFames. But you can use the connector’s batch DataFrame support to write the output of each batch (i.e., updated word counts) to Cassandra, as shown below:

# In Python
          host_addr = "<ip address>"
          keyspace_name = "<keyspace>"
          table_name = "<tableName>"
          spark.conf.set("spark.cassandra.connection.host", host_addr)
           def write_counts_to_cassandra(updated_counts_df, batch_id):
          # Use Cassandra batch data source to write the updated counts
           ( updated_counts_df.write
           .format("org.apache.spark.sql.cassandra")
           .mode("append")
           .options(table=table_name, keyspace=keyspace_name)
           .save() )
           streaming_query = ( 
          counts.writeStream
          .foreachBatch(write_counts_to_cassandra)
          .outputMode("update")
          .option("checkpointLocation", checkpointDir)
          .start() )
          // In Scala
          val hostAddr = "<ip address>"
          val keyspaceName = "<keyspace>"
          val tableName = "<tableName>"
          spark.conf.set("spark.cassandra.connection.host", hostAddr)
          def writeCountsToCassandra(updatedCountsDF: DataFrame, batchId: Long) {
           // Use Cassandra batch data source to write the updated counts 
           updatedCountsDF.write
           .format("org.apache.spark.sql.cassandra")
           .options(Map("table" -> tableName, "keyspace" -> keyspaceName))
           .mode("append")
           .save()
          }
          val streamingQuery = counts.writeStream
           .foreachBatch(writeCountsToCassandra)
           .outputMode("update")
           .option("checkpointLocation", checkpointDir)
           .start()

With foreachBatch, you can do the following:

Reuse existing batch data sources - As shown in the earlier example, with foreachBatch, you can use existing batch data sources (i.e., sources that support writing batch DataFrames) to write the output of streaming queries.

Write to multiple locations - If you want to write the output of a streaming query to multiple locations (e.g., an OLAP data warehouse and an OLTP database), then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the batchOutputDataFrame, write it to multiple locations, and then unpersist it.

 # In Python
           def write_counts_to_multiple_locations(updated_counts_df, batch_id):
           updatedCountsDF.persist()
           updatedCountsDF.write.format(...).save() # location 1
           updatedCountsDF.write.format(...).save() # location 2
           updatedCountsDF.unpersist()
          // In Scala
           def writeCountsToMultipleLocations(
           updatedCountsDF: DataFrame, 
           batchId: Long) {
           updatedCountsDF.persist()
           updatedCountsDF.write.format(...).save() // location 1
           updatedCountsDF.write.format(...).save() // location 2
           updatedCountsDF.unpersist()
          }

Apply additional DataFrame operations - Many DataFrame API operations are not supported14 on streaming DataFrames because Structured Streaming does not support generating incremental plans in those cases. Using foreachBatch, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.

Note: foreachBatch only provides at-least-once write guarantees. You can get exactly-once guarantee by using the batchId to deduplicate multiple writes from re-executed micro-batches.

Using foreach

If foreachBatch is not an option (for example, corresponding batch data writer does not exist), then you can express your custom writer logic using foreach. Specifically, you can express the data writing logic by dividing it into three methods: open, process, and close. Structured Streaming will use these methods to write each partition of the output records. abstract example.

# In Python
          class Foreach_writer:
          def open(self, partition_id, epoch_id):
           # Open connection to data store
           # Return True if write should continue
           # This method is optional in Python 
           # If not specified, the write will continue automatically
           return True
          def process(self, row):
           # Write string to data store using opened connection
           # This method is NOT optional in Python
           pass
          def close(self, error):
           # Close the connection.
           # This method is optional in Python
           pass
          streaming_df.writeStream.foreach(Foreach_writer()).start()
          // In Scala
          val foreachWriter = new ForeachWriter[String] {
           def open(partitionId: Long, version: Long): Boolean = {
           // Open connection to data store
           // Return true if write should continue
           }
           def process(record: String): Unit = {
           // Write string to data store using opened connection
           }
           def close(errorOrNull: Throwable): Unit = {
           // Close the connection
           }
           }
          streamingDatasetOfString.writeStream
          .foreach(foreachWriter)
           .start()

The detailed semantics of these methods as executed are discussed in the online programming guide15.

Reading from any storage system

Unfortunately, as of Spark 2.4, the APIs to build custom streaming sources and sinks are not public and stable. Hence, there is no official way to read from arbitrary storage systems. This may change with Spark 3.0 in which the DataSourceV2 initiative is attempting to stabilize those APIs. Please refer to the latest release notes for the most up-to-date information.

Data Transformations

In this section, we are going to dig deeper into the data transformations supported in Structured Streaming. As briefly discussed earlier, only the DataFrame operations that can be executed incrementally are supported in Structured Streaming. These operations are broadly classified into stateless and stateful operations. We will define each type of operation and explain how to identify which operations are stateful.

Incremental Execution and Streaming State

As we discussed in the earlier section “Under the hood,” the Catalyst Optimizer in Spark SQL converts all the DataFrame operations to an optimized logical plan. The Spark SQL planner, that decides how to execute a logical plan, recognizes that this is a streaming logical plan that needs to operate on continuous data streams. Accordingly, instead of converting the logical plan to a one-time physical execution plan, the planner generates a continuous sequence of execution plans. Each execution plan updates the final DataFrame result incrementally, that is, the plan processes only a chunk of new data from the input streams and possibly some intermediate, partial result computed by the previous execution plan.

Each execution is considered as a micro-batch and the partial, intermediate result that is communicated between the executions is called the streaming “state.” DataFrame operations can be broadly classified into stateless and stateful operations based on whether executing the operation incrementally requires maintaining a state. In the rest of this section, we are going to explore the distinction between stateless and stateful operations and how their presence in a streaming query requires different runtime configuration and resource management.

Note: Some logical operations are fundamentally either impractical or very expensive to be computed incrementally, and hence they are not supported in Structured Streaming. For example, operations like cube() and rollup() are quite expensive to be computed incrementally and any attempt to start a streaming query with them will throw an “UnsupportedOperationException.”

Stateless Transformations

All projection operations (e.g., select, explode, map, or flatMap) and selection operations (e.g., filter, where) process each input record individually without needing any information from previous rows. This lack of dependence on prior input data makes them stateless operations.

A streaming query having only stateless operations supports Append and Update mode but not Complete mode. Since any processed output row of such a query cannot be modified by any future data, it naturally supports the Append mode and can be written out to all streaming sinks (including append-only ones, like files of any format). On the other hand, such queries naturally do not combine information across input records, and therefore may not reduce the volume of the data in the result. Storing the ever-growing result data is usually costly, hence, such queries do not support Complete mode. This is in sharp contrast with stateful transformations, as we will discuss next.

Stateful Transformations

The simplest example of a stateful transformation is DataFrame.groupBy().count() which is a running count of the number of records received since the beginning of the query. In every micro-batch, the incremental plan adds the count of new records to the previous count generated by the previous micro-batch. This partial count communicated between plans is the “state.” This state is maintained in the Spark cluster’s memory and is checkpointed to the configured location in order to tolerate failures. While Spark SQL automatically manages the life-cycle of this state to correct results, you typically have to tweak a few knobs to control the resource usage for maintaining state. In this section, we are going to explore how different stateful operators manage their state under the hood.

Distributed and Fault-tolerant State Management

Recall from Chapter 1 and 2 that a Spark application running in a cluster has a driver and multiple executors. Spark’s scheduler running in the driver breaks down your high-level operations into smaller tasks, puts them in task queues and, as resources become available, the executors pull the tasks from the queues to execute them. Each micro-batch in a streaming query essentially performs one such set of tasks that read new data from streaming sources and writes updated output to streaming sinks. For stateful stream processing queries, besides writing to sinks, each micro-batch of tasks generate intermediate state data which will be consumed by the next micro-batch. This state data generation is completely partitioned and distributed (as all reading, writing and processing in Spark) and it is cached in the executor memory for efficient consumption. This is illustrated in Figure 9-6 which shows how the state is managed in our original streaming word count query.

Figure 7-6. Distributed state management in Structured Streaming

Each micro-batch reads a new set of words, shuffles them within the executors to group them, computes the counts within the micro-batch and finally adds them to the running counts to produce the new counts. This new count is both, the output as well as the state for the next micro-batch, and hence it is cached in the executor. The next micro-batch data is grouped between executors in exactly the same way as before so that each word is always process by the same executor and can therefore locally read and update its running count.

However, it is not sufficient to just keep this state in memory as any failure (either of an executor, or entire application) will cause the in-memory state to be lost. To avoid loss, we synchronously save the key-value state update as change logs in the checkpoint location provided by the user. These changes are co-versioned with the offsets ranges processed in each batch, and the necessary version of the state can be automatically reconstructed back by reading the checkpointed logs. In case of any failure, Structured Streaming is able to re-execute the failed micro-batch by reprocessing the same input data along with the same state that it had before that micro-batch, thus producing the same output data as it would have had if there were no failure. This is critical for ensuring end-to-end exactly once guarantees.

To summarize, for all stateful operations, Structured Streaming ensures the correctness of operation by automatically saving and restoring the state in a distributed manner. Depending on the stateful operation, all you may have to do is tune the state cleanup policy such that old keys and values can be automatically dropped from the cached state. This is what we will discuss next.

Types of Stateful Operations

The essence of streaming state is to retain summarized information of past data. Sometimes old summaries need to be cleaned up from the state to make room for new summaries. Based on how this state is cleaned up, there are two types of stateful operations:

Managed stateful operations - They automatically identify and cleanup old state, based on a operation-specific definition of “old.” You can tune what is defined as old in order to control the resource usages (e.g., executor memory used to store state). The operations that fall in this category are

Streaming aggregations

Stream-stream joins

Streaming deduplication

Unmanaged stateful operations - They let you define your custom state cleanup logic. This category has the following operations.

MapGroupsWithState

FlatMapGroupsWithState

These operations allow you define arbitrary stateful operations (e.g., sessionization, etc.)

Each of these operations are discussed in detail in the following sections.

Stateful Streaming Aggregations

Structured Streaming can incrementally execute most DataFrame aggregation operations. You can aggregate data by keys (e.g., streaming word count), and/or by time (e.g., count records received every hour). In this section, we are going to discuss the semantics and the operational details of tuning these different types of streaming aggregations. In the end, we will briefly discuss the few types of aggregations that are not supported in streaming. First, we start with aggregations not involving time.

Non-time-based Streaming Aggregations

Aggregations not involving time can be broadly classified into two categories:

Global aggregations - Aggregations across all the data in the stream. For example, say you have a stream of sensor readings as a streaming DataFrame named sensorReadings. You can calculate the running count of the total number of readings received with the following query.

 # In Python
        running_count = sensorReadings.groupBy().count()
        // In Scala
        val runningCount = sensorReadings.groupBy().count()

Note: You cannot use direct aggregation operations like DataFrame.count() and Dataset.reduce() on streaming DataFrames. This is because, for static DataFrames, these operations immediately return the final computed aggregates, whereas for streaming DataFrames, the aggregates have to be continuously updated. Therefore, you have to always use DataFrame.groupBy() or Dataset.groupByKey() for aggregations on streaming DataFrames.

Grouped aggregations - Aggregations within each group or key present in the data stream. For example, if sensorReadings have data from multiple sensors, you can calculate the running average reading of each sensor (say, for setting up a baseline value of each sensor) with the following:

 # In Python 
        baselineValues = sensorReadings.groupBy("sensorId").mean("value")
        // In Scala
        val baselineValues = sensorReadings.groupBy("sensorId").mean("value")

Besides simple counts, streaming DataFrames support the following types of aggregations (similar to batch DataFrames).

All built-in aggregation functions - sum(), mean(), stddev(), countDistinct(), collect_set(), approx_count_distinct(), etc. Refer to the online API documentations16 for more details.

Multiple aggregations computed together - You can apply multiple aggregation functions to be computed together in the following manner.

 # In Python
        multiple_aggs = ( sensor_readings
         .groupBy("sensorId")
         .agg(
         count(),
        mean("value").alias("baselineValue"),
        collect_set("errorCode").alias("allErrorCodes")))
        // In Scala
        val multipleAggs = sensorReadings
         .groupBy("sensorId")
         .agg(
         count(),
        mean("value").alias("baselineValue"),
        collect_set("errorCode").alias("allErrorCodes"))

User-defined aggregation functions - All user-defined aggregation functions are supported. See the online Spark SQL programming guide17 for more details on untyped and typed user-defined aggregation functions.

Regarding the execution of such streaming aggregations, we have already illustrated in previous sections how the running aggregates are maintained as a distributed state. In addition to state management, there are two very important points to remember for non-time-based aggregations: the output mode to use for such queries and planning the resource usage by state. These are discussed towards the end of this section. Next we are going to discuss aggregations that combine data within time windows.

Aggregations with event time windows

In many cases, rather than running aggregations over the whole stream, you want aggregations over data bucketed by time windows. Continuing with our sensor example, say each sensor is expected to send at most 1 reading per minute and we want to detect if any sensor is reporting unusually high number of times. To find such anomalies, we can count the number of readings received from each sensor in 5-minute intervals. In addition, for robustness, we should be computing the time interval based on when the data was generated at the sensor and not based on when the data was received as any transit delay will skew the results. In other words, we want to use the “event time”, that is, the timestamp in the record representing when the reading was generated. Say the sensorReadings DataFrame has the generation timestamp as a column named eventTime. You can express this 5-minute count as follows.

# In Python
        ( sensor_readings
        .groupBy("sensorId", window("eventTime", "5 minute"))
        .count() )
        // In Scala
        sensorReadings
         .groupBy("sensorId", window("eventTime", "5 minute"))
         .count()

The key thing to note here is the window function which allows us to express the 5-minute windows as a dynamically-computed grouping column. When started, this query will effectively do the following for each sensor reading.

Use the eventTime value to compute the 5-minute time window the sensor reading falls into.

Group the reading based on the composite group (sensorId, computed window)

Update the count of the composite group.

Let’s understand this with an illustrative example.

Figure 7-7. Mapping of event time to tumbling windows

Figure 9-7 shows how a few sensor readings are mapped to groups of 5-minute tumbling (i.e., non-overlapping) windows based on their event time. The two timelines show when each received event will be processed by Structured Streaming, and the timestamp in the event data (usually, the time when the event was generated at the sensor). Each 5-minute window over event-time is considered for the grouping based on which the counts will be calculated. Note that events may come late and out-of-order in terms of event time. As shown in the figure, the event with event-time 12:07 was received and processed after the event with time 12:11. However, irrespective of when they arrive, each event is assigned to the appropriate group of 5-minute window based on its event time. In fact, depending on the window specification, each event can be assigned to multiple groups as well. For example, if you want to compute counts corresponding to 10-minute windows sliding every 5 minutes, then you can do the following:

# In Python
        ( sensorReadings
        .groupBy(
        "sensorId", 
        window("eventTime", "10 minute", "5 minute"))
        .count() )
        // In Scala
        sensorReadings
        .groupBy(
        "sensorId", 
        window("eventTime", "10 minute", "5 minute"))
        .count()

In this query, every event will be assigned to two overlapping windows as illustrated below.

Figure 7-8. Mapping of event time to multiple, overlapping windows

Each unique tuple of (sensorId, assigned time window) is considered as a dynamically generated group for which counts will be computed. For example, the event [eventTime = 12:07, sensorId = id1] gets mapped to two time windows, and therefore two groups, (12:00-12:10, id1) and (12:05-12:15, id1). The count for these two windows are incremented by 1. The following figure illustrates this for the previously shown events.

Figure 7-9. Updated counts in the result table after each 5-minute trigger

Assuming that the input records were processed with a trigger interval of 5 minutes, the green tables in Figure 9-9 show the state of the result table (i.e., the counts) at the of the micro-batches. As the event time moves forward, new groups are automatically created and their aggregates are automatically updated. Late and out-of-order events get handled automatically as they simply update older groups.

However, from the point of view of resource usage, this poses a different problem - indefinitely growing state size. As new groups are created corresponding to latest time windows, the older groups continue to occupy the state memory waiting for any late data to update them. Even if in practice, there is a bound on how late the input data can be (e.g., data cannot be more than 7 days late), the query does not know that information. Hence the query does not know when to consider a window as “too old to receive updates” and drop it from the state. To provide a lateness bound to the query and prevent unbounded state, you can specify watermarks, as we will discuss next.

Handling late data with watermarks

Watermark is defined as a moving threshold in event-time that trails behind the maximum event-time seen by the query in the processed data. The trailing gap, known as watermark delay, defines how long the engine will wait for late data to arrive. By knowing the point at which no more data will arrive for a given group, the engine can automatically finalize the aggregates of certain groups and drop them from the state. This limits the total amount of state that the engine has to maintain to compute the results of the query.

For example, suppose you know that your sensor data will not be late by more than 10 minutes. Then you can set the watermark as follows.

# In Python
         ( sensor_readings
         .withWatermark("eventTime", "10 minutes")
         .groupBy("sensorId", window("eventTime", "10 minutes", "5 minutes"))
         .mean("value"))
        // In Scala
        sensorReadings
         .withWatermark("eventTime", "10 minutes")
         .groupBy("sensorId", window("eventTime", "10 minutes", "5 minute"))
         .mean("value")

Note that you must call withWatermark before the groupBy and on the same timestamp column as that used to define windows. When this query is executed, Structured Streaming will continuously track the maximum observed value of the eventTime column, and accordingly update the watermark, filter the “too late” data, and clear old state. That is, any data late by more than 10 minutes will be ignored, and all time windows that are more than 10 minutes older than the latest (by event time) input data will be cleaned up from the state. Let’s understand how this will be executed with the following timeline showing how a few input records were processed.

Figure 7-10. Illustration showing how the engine tracks the maximum event time across events, updates the watermark and accordingly handles late data

Figure 9-10 shows a two-dimensional plot of records processed in terms of their processing times (x-axis) and their event times (y-axis). The records are marked with yellow and red circles, and theyThe records were processed in micro-batches of 5 minutes. The green tables show the state of the result table after each micro-batch completes.

Each record was received and processed after all the records to its left. Consider the two records [12:15, id1] (processed around 12:17) and [12:13, id3] (processed around 12:18). The record for id3 was considered late (therefore marked in solid red) because it was generated by the sensor before the record for id1 but it was processed after the latter. However, in the micro-batch for processing time range 12:15 - 12:20, the watermark was calculated to at 12:04 based on the maximum event-time seen till the previous micro-batch (that is, 12:14 - 10 minutes watermark delay). Therefore the late record [12:13, id3] was not considered to be too late and was successfully counted. In contrast, in the next micro-batch, the record [12:04, id1] was considered to be too-late and was discarded.

You can set the watermark delay based on the requirements of your application — larger values of this parameter allows data to arrive later but at the cost of increased state size, that is, memory usage and vice versa.

Semantic Guarantees with Watermarks

Before we conclude this section about watermarks, let’s understand the precise semantic guarantee that watermarking provides. A watermark of 10 minutes guarantees that the engine will never drop any data that is delayed by less than 10 minutes compared to the latest event time seen in the input data. However, the guarantee is strict only in one direction. Data delayed by more than 10 minutes is not guaranteed to be dropped, that is, it may get aggregated. Whether an input record more than 10 minutes late will be actually aggregated or not depends on the exact timing of when the record was received and when the micro-batch processing it was triggered.

Supported Output Modes

Unlike streaming aggregations without time, aggregations with time windows can use all the three modes. However, there are other implications regarding state cleanup that you need to be aware of.

Update mode - In this mode, every micro-batch will output only the rows where the aggregate got updated. This mode can be used all types of aggregations. Specifically for time-window aggregations, watermarking will ensure that state will get cleaned up regularly. This is the most useful and efficient mode to run queries with streaming aggregations. However, you cannot use this mode to write aggregates to append-only streaming sinks any file-based formats like Parquet and ORC (unless you use Delta Lake which we will discuss in the next chapter).

Complete mode - In this mode, every micro-batch will output all the updated aggregates irrespective of how old or unchanged they are. While this mode can be used on all types of aggregations, for time-window aggregations, state will not be cleaned up even if watermark is specified. Outputting all aggregates requires all past state and hence aggregation data must be preserved even if watermark has been defined. Use this mode on time window aggregations with caution as this can lead indefinite increase in state size and memory usage (discussed in more detail in the next subsection).

Append mode - This mode can be used only with aggregations on event-time windows and with watermarking enabled. Recall that Append mode does not allow previously outputted results to change. For any aggregation without watermarks, every aggregate may be updated with any future data, and hence cannot be outputted in Append mode. Only when watermarking is enabled on aggregations on event-time windows does the query know when an aggregate is not going to update any further. Hence, instead of outputting the updated rows, the Append mode outputs each key and its final aggregate value only when the watermark ensures that the aggregate is not going to be updated again. The advantage of this mode is that it allows you to write aggregates to append-only streaming sinks (e.g., files). But the disadvantage is that the output will be delayed by watermark duration -- the query has to wait for the trailing watermark to exceed the time window of a key before its aggregate can be finalized.

Streaming Joins

Structured Streaming supports joining a streaming dataset with another static or streaming datasets. In this section we will explore what type of joins (i.e., inner, outer, etc.) are supported, and how to use watermarks to limit the state for the stateful joins. We will start with the simpler one - join between a data stream and a static dataset.

Stream-static Joins

Many use cases require joining a data stream with a static dataset. For example, let’s consider the use case of ad monetization. Suppose you are an advertisement company that shows ads on websites and you make money when users click on them. Let’s assume that you have a static dataset of all the ads to be shown (known as, impressions), and another stream of events for each time users click on the displayed ads. To calculate the click revenue, you have to match each click in the event stream to the corresponding ad impression in the table. Let’s first represent the data as two DataFrames, a static one and a streaming one, as shown below.

# In Python
        # Static DataFrame [adId: String, impressionTime: Timestamp, …]
        # reading from your static data source 
        impressionsStatic = spark.read. … 
        # Streaming DataFrame [adId: String, clickTime: Timestamp, …] 
        # reading from your streaming source
        clicksStream = spark.readStream. …
        // In Scala
        // Static DataFrame [adId: String, impressionTime: Timestamp, …]
        // reading from your static data source 
        val impressionsStatic = spark.read. … 
        // Streaming DataFrame [adId: String, clickTime: Timestamp, …]
        // reading from your streaming source 
        val clicksStream = spark.readStream. … 
        To match the clicks with the impressions, you can simply apply inner equi-join between them using the common adId column.
        # In Python
        matched = clicksStream.join(impressionsStatic, "adId")
        // In Scala
        val matched = clicksStream.join(impressionsStatic, "adId")

This is the same code as you would have written if both impressions and clicks were static DataFrames — the only difference is spark.read (for batch) and spark.readStream (for a stream). When this code is executed, every micro-batch of clicks is inner-joined against the static impression table to generate the output stream of matched events.

Besides inner join, Structured Streaming also supports two types of stream-static outer joins. They are as follows:

Left outer join when the left side is a streaming DataFrame

Right outer join when the right side is a streaming DataFrame

The other kinds of outer joins (e.g., full outer, and left outer with streaming DataFrame on the right) are not supported because they are not easy to run incrementally. In both supported cases, the code is exactly how it would be for left/right outer join between two static DataFrames.

# In Python
        matched = clicksStream.join(impressionsStatic, "adId", "leftOuter")
        // In Scala
        val matched = clicksStream.join(impressionsStatic, Seq("adId"), "leftOuter")

There are a few key points to note about stream-static joins.

Stream-static joins are stateless operations, and therefore do not require any kind of watermarking.

The static DataFrame is read repeatedly while joining with the streaming data of every micro-batch. So you can cache the static DataFrame to speed up the reads.

If the underlying data in the data source on which the static DataFrame was defined changes, whether those changes are seen by the streaming query depends on the specific behavior of the data sources. For example, if the static DataFrame was defined on files, then the changes to those files (e.g., appends) will not be picked up until the streaming query is restarted.

In this stream-static example of ad monetization, we made a significant assumption that the impression table is a static table. In reality, there will be a stream of new impressions generated as new ads are displayed. While stream-static joins are good for enriching data in one stream with additional static (or slowly changing) information, it is insufficient when both sources of data are changing rapidly. That is when you need stream-stream joins, which we will discuss next.

Stream-stream Joins

The challenge of generating join results between two data streams is that, at any point in time, the view of either dataset is incomplete, making it much harder to find matches between inputs. The matching events from the two streams may arrive in any order and may be arbitrarily delayed. For example, an impression event and its corresponding click event may arrive out-of-order with arbitrary delays between them. Structured Streaming accounts for such delays by buffering the input data from both sides as the streaming state, and continuously checking of matches with them as new data is received. The conceptual idea is sketched out in Figure 9-11.

Let’s understand this in more detail, first with inner joins and then with outer joins.

Figure 7-11. Ad monetization using stream-stream join

Inner joins with optional watermarking

Say we have redefined our impressions DataFrame to be a streaming DataFrame. To get the stream of matching impressions and their corresponding clicks, you will use the same code as we have seen earlier for static joins and stream-static joins.

# In Python
          # Streaming DataFrame [adId: String, impressionTime: Timestamp, …]
          impressions = spark.readStream. … 
          # Streaming DataFrame[adId: String, clickTime: Timestamp, …]
          clicks = spark.readStream. … 
          matched = impressions.join(clicks, "adId")
          // In Scala
          // Streaming DataFrame [adId: String, impressionTime: Timestamp, …] 
          val impressions = spark.readStream. … 
          // Streaming DataFrame[adId: String, clickTime: Timestamp, …] 
          val clicks = spark.readStream. … 
          val matched = impressions.join(clicks, "adId")

Even though the code is the same, the execution is completely different. When this query is executed, the processing engine will recognize it to be a stream-stream join instead of a stream-static join. The engine will buffer all clicks and impressions as state, and will generate a matching impression-and-click as soon as a received click matched a buffered impression (or vice versa, depending on which was received first). Let’s visualize how this inner join works using an example timeline of events.

Figure 7-12. Illustrative timeline of clicks, impressions and their joined output

In Figure 9-12, the blue dots represent the event times of impression and click events that were received over time across different micro-batches (separated by the dashed grey lines). For the purposes of this illustration, assume that each event was actually received at the same wall clock time as the event time. Note the different scenarios under which the related events are being joined. Both events with adId = were received in the same micro-batch, so their joined output was generated by that micro-batch. However, for adId = impression was received at 12:04, much earlier than its corresponding click at 12:13. Structured Streaming would first receive the impression at 12:04 and buffer it in the state. For each received click, the engine will try to join it with all buffered impressions and vice versa. Eventually, at a later micro-batch running around 12:13, the engine would receive the click for adId = and generate the joined output.

However, in this query, we have not given any indication on how long the engine should buffer an event to find a match. Therefore, the engine may buffer an event forever and accumulate an unbounded amount of streaming state. To limit the streaming state maintained by stream-stream joins, you need to know the following information about your use case:

What is the time range between the generation of the two events at their respective sources? In the context of our use case, let’s assume that a click can occur within 0 seconds to 1 hour after the corresponding impression.

What is the maximum duration an event can be delayed in transit between the source and the processing engine? For example, ad clicks from a browser may get delayed due to intermittent connectivity and arrive much later and out-of-order than expected. Let’s say, that impressions and clicks can be delayed by at most 2 and 3 hours, respectively.

These delay limits and event time constraints between matching events can be encoded in the DataFrame operations using watermarks and time range conditions. In other words, you will have to do the following additional steps in the join to ensure state cleanup.

Define watermark delays on both inputs such that the engine knows how delayed the input can be (similar to streaming aggregations).

Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e., will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the following two ways:

Time range join conditions (e.g., join condition = “leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR”)

Join on event-time windows (e.g., join condition = “leftTimeWindow = rightTimeWindow”).

In our use case of advertisements, our inner join code will get a little bit more complicated, as shown below.

# In Python
          # Define watermarks
          impressions_with_watermark = ( impressions
           .selectExpr("adId AS impressionAdId", "impressionTime")
           .withWatermark("impressionTime", "2 hours") )
          clicks_with_watermark = ( clicks
           .selectExpr("adId AS clickAdId", "clickTime")
           .withWatermark("clickTime", "3 hours") )
          # Inner join with time range conditions
          impressions_with_watermark.join(
           clicks_with_watermark,
           expr(""" 
           clickAdId = impressionAdId AND 
           clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""))
          // In Scala
          // Define watermarks
          val impressionsWithWatermark = impressions
           .selectExpr("adId AS impressionAdId", "impressionTime")
           .withWatermark("impressionTime", "2 hours ")
          val clicksWithWatermark = clicks
           .selectExpr("adId AS clickAdId", "clickTime")
           .withWatermark("clickTime", "3 hours")
          // Inner join with time range conditions
          impressionsWithWatermark.join(
           clicksWithWatermark,
           expr(""" 
           clickAdId = impressionAdId AND 
           clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""))

With these time constraints for each event, the processing engine can automatically calculate how long events need to be buffered for generating correct results and when the events can be dropped from state. For example, it will evaluate the following (also illustrated in Figure 9-x).

Impressions need to be buffered for at most 4 hours (in event-time) as a 3-hour-late click may match with an impression made 4 hours ago (i.e., 3-hour-late + up to 1 hour delay between the impression and click).

Conversely, clicks need to be buffered for at most 2 hours (in event-time) as a 2-hour-late impression may match with click received 2 hours ago.

Figure 7-13. Structured Streaming automatically calculates thresholds for state clean up using watermark delays and time range conditions

There are a few key points to remember about inner joins.

For inner joins, specifying watermarking and event-time constraints are both optional. In other words, at the risk of potentially unbounded state, you may choose not to specify them. Only when both are specified, you will get state cleanup.

Similar to the guarantees provided by watermarking on aggregations, a watermark delay of “2 hours” guarantees that the engine will never drop or not match any data that is less than 2 hours delayed. But data delayed by more than 2 hours may or may not get processed.

Outer joins with Watermarking

The earlier inner join will output only those ads for which both events have been received. In other words, ads that received no click would not be reported at all. Instead you may want all ad impressions to be reported, with or without the associated click data, to enable additional analysis later (e.g., click through rates). This brings us to stream-stream outer joins. All you need to do is specify the outer join type. That is, the following code:

# In Python
          # Left outer join with time range conditions
          impressions_with_watermark.join(
           clicks_with_watermark,
           expr(""" 
           clickAdId = impressionAdId AND 
           clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""),
           "leftOuter") # only change: set the outer join type
          // In Scala
          // Left outer join with time range conditions
          impressionsWithWatermark.join(
           clicksWithWatermark,
           expr(""" 
           clickAdId = impressionAdId AND 
           clickTime BETWEEN impressionTime AND impressionTime + interval 1 hour"""),
           "leftOuter") // only change: set the outer join type

As expected of outer joins, this query will start generating output for every impression, with or without (i.e., using NULLS) the click data. However, outer joins have a few additional points to note.

Unlike inner joins, the watermark delays and event-time constraints are not optional for outer joins. This is because for generating the NULL results, the engine must know when an event is not going to match with anything else in the future. Hence, the watermarks and event-time constraints must be specified for generating correct outer join results (along with state cleanup).

Consequently, the outer NULL results will be generated with a delay as the engine has to wait for a while to ensure that there neither were nor would be any matches. This delay is the maximum buffering time (with respect to event-time) calculated by the engine for each event as discussed in the earlier section (i.e., 4 hours for impressions and 2 hours for clicks).

Arbitrary stateful computations

Many use cases require more complicated logic than the SQL operations we have discussed till now. For example, say you want to track the statuses (e.g., signed in, busy, idle) of users by tracking by their activities (e.g., clicks) in real-time. To build this stream processing pipelines, you will have to track each user’s activity history as a state with arbitrary data structure, and continuously apply arbitrarily complex changes on the data structure based on the user’s actions. The operation mapGroupsWithState and its more flexible counterpart flatMapGroupsWithState are designed for such complex analytical use cases.

In this section, we will start with a simple example with mapGroupsWithState to understand the four key steps to modeling custom state data and defining custom operations on it. Then we will discuss the concept of timeouts and how you can use it to expire state that have not been updated for a while. Then we will end with flatMapGroupsWithState, which gives you even more flexibility.

Note: As of Spark 3.0, these two operations are only available in Scala and Java.

Modeling Arbitrary Stateful Operation with mapGroupsWithState

State with arbitrary schema and transformations on the state is modeled as a user-defined function that takes the previous version of the state value and new data as inputs, and generates the updated state and computed result as outputs. Programmatically in Scala, you will have to define a function with the following signature (K, V, S and U are data types as explained later).

// In Scala
        def arbitraryStateUpdateFunction(
        key: K, 
        newDataForKey: Iterator[V], 
        previousStateForKey: GroupState[S]
        ): U 

And this function is provided to a streaming query using the operations groupByKey and mapGroupsWithState as follows.

// In Scala
         val inputDataset: Dataset[V] = // input streaming dataset
         inputDataset
         .groupByKey(keyFunction) // keyFunction() generates key from input
         .mapGroupsWithState(arbitraryStateUpdateFunction) 

When this streaming query is started, in each micro-batch Spark will call this customStateUpdateFunction for each unique key in the micro-batch’s data. Let’s understand in more detail what the parameters are and what parameter values Spark will call the function with.

key: K - K is the data type of the common keys defined in state and the input. Spark will call this function for each unique key in the data.

newDataForKey: Iterator[V] - V is the data type of the input dataset. When Spark calls this function for a key, this parameter will have all the new input data corresponding to that key. Note that there is no defined order in which the input data object will be present in the iterator.

previousStateForKey: GroupState[S] - S is the data type of the arbitrary state you are going to maintain and GroupState[S] is a typed wrapper object that provides methods to access and manage the state value. When Spark calls this function for a key, this object will provide the state value set by the previous time Spark called this function for that key (i.e., one of the previous micro-batches).

U is the data type of the output of the function.

There are a couple of additional parameters that you have to provide. All the types K, V, S, U must be types encodable by Spark SQL’s Encoders. Accordingly, in mapGroupsWithState, you have to provide the typed encoders for S and U either implicitly in Scala or explicitly in Java. See the section on Dataset Encoders in Chapter 6 for more details.

Let’s understand how to express your desired state update function in this format with an example. Say we want to understand user behavior based on their actions. Conceptually, it is quite simple - in every micro-batch, for each active user, we will use the new actions made by the user and update the user’s “status.” Programmatically, you can define the state update function with the following steps.

Define your data types - That is, define the exact data types of K, V, S, U. In this case, we are going to choose the following

Input data (V) = case class UserAction(userId: String, action: String)

Keys (K) = String, that is, the userId

State (S) = case class UserStatus(userId: String, active: Boolean)

Output (U) = UserStatus, as we want to output the latest user status

Note that all these data types are supported in encoders.

Define your function - Based on the chosen types, let’s translate the conceptual idea into code. When this function is called with new user actions, there are two main situations we need to handle: whether there exists a previous state (i.e., previous user status) for that word or not. Accordingly, we will initialize the running count, or add to the existing value. We will explicitly update the state with the new running count, and finally, return the updated word-count pair.

// In Scala
         def updateUserStatus(
userId: String,
newActions: Iterator[UserAction],
state: GroupState[UserStatus]): UserStatus = {
val userStatus = state.getOption.getOrElse { new UserStatus() } newActions.foreach { action => userStatus.updateWith(action) } state.update(userStatus) return userStatus }

Apply your function on the actions - You will have to group the input actions dataset using groupByKey and then apply the above function using mapGroupsWithState.

// In Scala
        val userActions: Dataset[UserAction] = …
        val latestStatuses = userActions
        .groupByKey(userAction => userAction.userId) 
        .mapGroupsWithState(updateUserStatus)

Once you start this streaming query with console output, we will see the updated user statuses being printed. Before we move on to more advanced topics, there are a few notable points to remember.

When the function is called, there is no well defined order of the input records in the new data iterator (e.g., newActions). If you need to update the state with the input records in a specific order (e.g., in the order the actions were performed), then you have to explicitly reorder them (e.g., based on event timestamp, or other ordering id). In fact, if there is a possibility that actions may be read out of order from the source, then you have to consider the possibility that the future micro-batch may receive data that should be processed before the data in the current batch. In that case, you have to buffer the records as part of the state.

In a micro-batch, the function is called on a key once only if the micro-batch has data for a key. For example, if a user becomes inactive and provides no new actions for a long time, then by default, the function will not be called for a long time. If you want to update or remove state based on a user’s inactivity for a long time, then we have to use timeouts which we will discuss in the next section.

The output of mapGroupsWithState is assumed by the incremental processing engine to be continuously updated key-value records, similar to the output of aggregations. This limits what operations are supported in the query after mapGroupsWithState and what sinks are supported. For example, appending the output into files is not supported. If you want to apply arbitrary stateful operations with greater flexibility, then you have to use flatMapGroupsWithState. We will discuss this after timeouts.

Using timeouts to manage inactive groups

In the above example of tracking active user sessions, as more users become active, the number of keys in the state will keep increasing and so will the memory used by the state. Now, in a real world scenario, users are likely not going to stay active all the time. As some users become inactive, it may not be very useful to keep such users’ status in the state as it is not going to change until the user becomes active again. Hence, we may want to explicitly drop all information for inactive users. However, a user may not explicitly take any action to become inactive (e.g., the user may not explicitly log off) and we may have to define inactivity as lack of any action for a threshold duration. This becomes tricky to encode in the function as the function is not called for a user until there are new actions from the users.

To encode time-based inactivity, mapGroupsWithState supports timeouts that are defined as follows.

Each time the function is called on a key, a timeout can be set on the key based on a duration, or a threshold timestamp.

If that key does not receive any data such that the timeout condition (i.e., by duration or timestamp) is hit, the key is marked as “timed out.” The next micro-batch will call the function on this timed-out key even if there is no data for that key in that micro-batch. In this special function call, the new input data iterator will be empty (since there is no new data) and GroupState.hasTimedOut() will return true. The latter is the best way to identify inside the function whether the call was due to new data or timeout.

Now, there are two types of timeouts based on two notions of time: processing time and event time. The processing-time timeout is the simpler one of the two to use, so let’s see how to use it first.

Processing-time Timeouts

Processing-time Timeout is based on the system time (also known as the wall clock time) of the machine running the streaming query and is defined as follows. If a key had last received data at system timestamp T, and the current timestamp is more than (T + timeout duration), then the function will be called again with an empty new data iterator.

Let’s understand how to use timeouts by updating our user example to remove a user’s state based on 1 hour of inactivity. We will make three changes:

In mapGroupsWithState, we will specify the timeout as ProcessingTimeTimeout.

In the state update function, before updating the state with new data, we have to check whether the state has timed out or not. Accordingly, we will update or remove the state.

In addition, every time we update the state with new data, we will set the timeout duration.

// In Scala
          def updateUserStatus(
userId: String,
newActions: Iterator[UserAction],
state: GroupState[UserStatus]): UserStatus = { if (!state.hasTimeoutOut) { // was not called due to timeout
val userStatus = state.getOption.getOrElse { new UserStatus() } newActions.foreach { action => userStatus.updateWith(action) } state.update(userStatus) state.setTimeoutDuration("1 hour") // set processing-time timeout duration return userStatus } else { val userStatus = state.get() state.remove() // remove state when timed out return userStatus.asInactive() // return inactive user’s status } } val latestStatuses = userActions .groupByKey(userAction => userAction.userId) .mapGroupsWithState( GroupStateTimeout.ProcessingTimeTimeout)( updateUserStatus)

This query will automatically cleanup the state of users for whom the query has not processed any data for more than an hour. However, there are a few points to note about timeouts:

The timeout set by the last call to the function is automatically cancelled when the function is called again, either for the new received data or for the timeout. Hence, whenever the function is called, the timeout duration or timestamp needs to be explicitly set to enable the timeout.

Since the timeouts are processed during the micro-batches, the timing of the timeout is imprecise and depends heavily on the trigger interval and micro-batch processing times. Therefore, it is not advised to use timeouts for precise timing control.

While processing-time timeouts are simple to reason about, they are not robust towards slowdowns and downtimes. If the streaming query suffers a downtime for more than 1 hour, then after restart, all the keys in the state will suffer timeouts as by the system time — for more than one hour has passed since each key received data. Similar wide-scale timeouts can occur if the query processes data slower than they are arriving at the source (e.g., arriving and getting buffered in Kafka). For example, if the timeout is 5 minutes, then a sudden drop in processing rate (or spike in data arrival rate) that causes 5 min lag could produce spurious timeouts. To avoid such issues, we can use event-time timeout, which we will discuss next.

Event-time Tim eout

Instead of the system clock time, event-time timeout is based on the event time in the data (similar to time-based aggregations) and watermark defined on the event time. If a key is configured with a specific timeout timestamp of T (i.e., not a duration), then that key timeouts when the watermark exceeds T and no new data was received for that key since the last time the function was called. Recall that the watermark is a moving threshold that lags behind the maximum event-time seen while processing the data. Hence, unlike system time, the watermark moves forward in time at the same rate as the data is processed. Therefore, unlike processing-time timeouts, any slowdown or downtime in the query processing will not cause spurious timeouts.

Let’s modify our example to use event-time timeout. In addition to the changes we already made for using processing-time timeout, we will make the following changes:

Define watermarks on the input dataset (assume that the class UserAction has an eventTimestamp field). Set the watermark threshold as an acceptable threshold by which input data can be late and out-of-order.

Update mapGroupsWithState to use EventTimeTimeout.

Update the function to set the threshold timestamp at which the timeout will occur. Note that event-time timeout does not allow setting a timeout duration like the processing-time timeout. We will discuss the reason for this later. In this example, we will calculate this timeout as the current watermark + 1 hours.

Here is the updated example.

// In Scala
          def updateUserStatus(
userId: String,
newActions: Iterator[UserAction],
state: GroupState[UserStatus]):UserStatus = { If (!state.hasTimeoutOut) { // was not called due to timeout
val userStatus = if (state.getOption.getOrElse { new UserStatus() } newActions.foreach { action => userStatus.updateWith(action) } state.update(userStatus) // set the timeout timestamp to the current watermark + 1 hour state.setTimeoutTimestamp(state.getCurrentWatermarkMs, "1 hour") return userStatus } else { val userStatus = state.get() state.remove() return userStatus.asInactive() } } val latestStatuses = userActions .withWatermark("eventTimestamp", "10 minutes") // define watermark .groupByKey(userAction => userAction.userId) // use event-time timeout .mapGroupsWithState( GroupStateTimeout.EventTimeTimeout)( updateUserStatus)

This query will be much more robust to spurious timeouts caused by restarts and processing delays. Here are a few points to note about event-time timeouts:

Unlike the previous example with processing-timeouts, we have used GroupState.setTimeoutTimestamp instead of GroupState.setTimeoutDuration. This is because with processing time-timeouts, the duration is sufficient to calculate the exact future timestamp (i.e., current system time + specified duration) when the timeout would occur. This is not the case for event-time as different applications may want to use different strategies to calculate the threshold timestamp. In this example, we simply calculated the timeout timestamp based on the current watermark. A different application may instead choose to calculate a key’s timeout timestamp based on the maximum event-time timestamp seen for that key (tracked and saved as part of the state).

The timeout timestamp must be set to a value larger than the current watermark. This is because the timeout is expected to happen when the timestamp crosses the watermark, and it’s illogical to set the timestamp to a value already larger than the current watermark.

Before we move on from timeouts, one last thing to remember is that you can use the timeout mechanisms for more creative processing than fixed-duration timeouts. For example, you can implement an approximately periodic task (say, every hour) on the state by saving the last task execution timestamp in the state and using that to set the processing-time timeout duration as shown in this code snippet:

// In Scala
          timeoutDurationMs = lastTaskTimstampMs + periodIntervalMs - groupState.getCurrentProcessingTimeMs()

Generalization with flatMapGroupsWithState

There are two key limitations with mapGroupsWithState that may limit the flexibility that we want to implement more complex use cases (e.g., chained sessionizations, etc.).

Every time the mapGroupsWithState is called, you have to return one and only one record. For some applications, in some triggers, you may not want to output anything at all.

With mapGroupsWithState, due to the lack of more information about the opaque state update function, the engine assumes that generated records are updated key-value data pairs. Accordingly, the engine reasons about downstream operations and allows or disallows some of them. For example, the DataFrame generated using mapGroupsWithState cannot be written out in append mode to files. However, some applications may want to generate records that can be considered as appends.

flatMapGroupsWithState overcomes these limitations at the cost of a slightly more complex syntax. It has two differences from mapGroupsWithState.

Returns an iterator from function: The return type of the state update function is an iterator instead of a single object. This allows the function to return any number of records, and if needed, no records at all.

Operator Output Mode: flatMapGroupsWithState takes another parameter called the operator output mode (not to be confused with the query’s output mode we discussed earlier in the chapter). This mode defines whether the output records are new records that can be appended (i.e., OutputMode.Append) or updated key-value records (i.e., OutputMode.Update).

Let’s understand this by extending our user tracking example. For example, we want to generate alerts only for certain user changes and we want to write the output alerts to files.

// In Scala
        def getUserAlerts(
userId: String,
newActions: Iterator[UserAction],
state: GroupState[UserStatus]): Iterator[UserAlert] = { If (!state.hasTimeoutOut) { // was not called due to timeout
val userStatus = if (state.getOption.getOrElse(new UserStatus()) val alerts: Iterator[UserAlerts] = newActions.flatMap { action => val shouldAlert = userStatus.updateWith(action) if (shouldAlert) Seq(UserAlert(userStatus)) else None } ... // update state and set timeouts return alerts } else { state.remove() return Iterator.empty() // return nothing } } val userAlerts = userActions .withWatermark("eventTimestamp", "10 minutes") .groupByKey(userAction => userAction.userId) .flatMapGroupsWithState( OutputMode.Append, GroupStateTimeout.EventTimeTimeout )(getUserAlerts) .writeStream .format("parquet") .start("/tmp/alerts")

Performance Tuning

Structured Streaming uses the Spark SQL engine and therefore can be tuned with the same parameters as those discussed for Spark SQL in Chapter Xxx. However, unlike batch jobs that may process gigabytes to terabytes of data, micro-batch jobs usually process much smaller volumes of data. Hence the Spark cluster running streaming queries, usually need to be tuned slightly differently. Here are a few differences that are worth noting.

Cluster resource provisioning - Since Spark clusters running streaming queries are going to run 24/7, it is important to provision resources appropriately. Underprovisoning the resources can cause the streaming queries to fall behind (e.g., micro-batches taking longer and longer), and overprovisioning (e.g., allocated but unused cores) can cause unnecessary costs. Furthermore allocation should be done based memory or cores depending on the nature of the streaming queries:

non-stateful queries usually need more cores and

stateful queries usually need more memory.

Number of partitions for shuffles - For Structured Streaming queries, the number of shuffle partitions usually needs to be set much lower than most batch queries. and dividing the computation too much increases overheads and reduces throughout. Furthermore, shuffles due to stateful operations have significantly higher task overheads due to checkpointing. Hence, for streaming queries with stateful operations and trigger intervals of few seconds to minutes, it is recommended to tune the number of shuffle partitions from it’s default value of 200 to at most 2-3x the number of allocated cores.

Setting source rate limits for stability - Once the allocated resources and configuration has been optimized for a query’s expected input data rates, it’s possible that sudden surges in data rates can generate unexpectedly large jobs and subsequent instability. Besides the costly approach of overprovisioning, you can safeguard against instability using source rate limits. Setting limits in supported sources (e.g., Kafka, and files) prevents the query from consuming too much data in a single micro-batch. The surge data will stay buffered in the source and the query will slowly eventually catch up with However, note the following

Setting the limit too low can cause the query to underutilize allocated resources and fall behind the input rate

Limits do not effectively guard against sustained increases in input rate. While stability is maintained, the volume of buffered, unprocessed data will grow indefinitely at the source and so will the end-to-end latencies.

Multiple streaming queries in the same Spark application - Running multiple streaming queries in the same SparkContext can lead to fine-grained resource sharing. However there are a few points to note:

Executing each query continuously uses resources in the Spark driver (i.e., the JVM having the SparkContext). This limits the number of queries that the driver can execute simultaneously. Hitting those limits can either bottleneck the task scheduling (i.e., underutilizing the executors) or exceed memory limits.

You can ensure fairer resource allocation between queries in the same context by setting them run on separate scheduler pools. Set the SparkSession property named spark.scheduler.pool to a different string values for each stream.

 // In Scala
      // Run streaming query1 in scheduler pool1
      spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
      df.writeStream.queryName("query1").format("parquet").start(path1)
      // Run streaming query2 in scheduler pool2
      spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
      df.writeStream.queryName("query2").format("orc").start(path2)
       # In Python
      # Run streaming query1 in scheduler pool1
      spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
      df.writeStream.queryName("query1").format("parquet").start(path1)
      # Run streaming query2 in scheduler pool2
      spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
      df.writeStream.queryName("query2").format("parquet").start(path2)

Summary

To recap, this chapter explored how you can write Structured Streaming queries using DataFrame APIs. Specifically, we discussed the following.

The central philosophy of Structured Streaming and the processing model of treating input data streams as unbounded tables.

The key steps to define, start, restart and monitor streaming queries.

The recipes to use various built-in streaming sources and sinks, and to write custom streaming sinks.

The details of how to use and tune managed stateful operations like streaming aggregations and stream-stream joins

The techniques of how to express custom stateful computations.

Through code snippets in the chapter and notebooks in the Learning Spark 2ed Github 18, you will continue to get a feel for how to use Structured Streaming. In the next chapter, we explore how Delta Lake makes it convenient to store the processed data and manage it with transactional guarantees.

1 For a more detailed explanation, see the original research paper “Discretized Streams: Fault-Tolerant Streaming Computation at Scale” - Zaharia, et. al., published at SOSP 2013.

2 Structured Streaming Programming Guide - https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

3 https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

4 This execution loop runs for micro-batch based trigger modes (i.e., ProcessingTime and Once) and not the Continuous trigger mode.

5 Structured Streaming Programming Guide - https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

6 Dropwizard Metrics - https://metrics.dropwizard.io

7 Spark Monitoring Guide - https://spark.apache.org/docs/latest/monitoring.html#metrics

8 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets

9 https://kafka.apache.org/

10 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

11 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

12 http://cassandra.apache.org/

13 https://github.com/datastax/spark-cassandra-connector

14 For the full list of unsupported operations, see the relevant section in the online Structured Streaming Programming Guide - https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

15 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach

16 Directory having links to language-specific API docs - https://spark.apache.org/docs/latest/api/

17 Spark SQL Guide - https://spark.apache.org/docs/latest/sql-getting-started.html#aggregations

18 https://github.com/databricks/LearningSparkV2

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset