We will create a simple PySpark Streaming application using the following code called streaming_word_count.py:
#
# stateful_streaming_word_count.py
#
# Import the necessary classes and create a local SparkContext and Streaming Contexts
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create Spark Context with two working threads (note, `local[2]`)
sc = SparkContext("local[2]", "StatefulNetworkWordCount")
# Create local StreamingContextwith batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create checkpoint for local StreamingContext
ssc.checkpoint("checkpoint")
# Define updateFunc: sum of the (key, value) pairs
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
# Create DStream that will connect to the stream of input lines from connection to localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Calculate running counts
# Line 1: Split lines in to words
# Line 2: count each word in each batch
# Line 3: Run `updateStateByKey` to running count
running_counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.updateStateByKey(updateFunc)
# Print the first ten elements of each RDD generated in this stateful DStream to the console
running_counts.pprint()
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
To run this PySpark Streaming application, execute the following command from your $SPARK_HOME folder:
./bin/spark-submit stateful_streaming_word_count.py localhost 9999
In terms of how you time this, you should:
- First start with nc -lk 9999.
- Then, start your PySpark Streaming application: ./bin/spark-submit stateful_streaming_word_count.py localhost 9999.
- Then, start typing your events, for example:
- For the first second, type blue blue blue blue blue green green green
- For the second second, type gohawks
- Wait a second; for the fourth second, type green green
The console output from your PySpark streaming application will look something similar to the following output:
$ ./bin/spark-submit stateful_streaming_word_count.py localhost 9999
-------------------------------------------
Time: 2018-06-21 23:00:30
-------------------------------------------
(u'blue', 5)
(u'green', 3)
-------------------------------------------
Time: 2018-06-21 23:00:31
-------------------------------------------
(u'blue', 5)
(u'green', 3)
(u'gohawks', 1)
-------------------------------------------
Time: 2018-06-21 23:00:32
-------------------------------------------
-------------------------------------------
Time: 2018-06-21 23:00:33
-------------------------------------------
(u'blue', 5)
(u'green', 5)
(u'gohawks', 1)
-------------------------------------------
To end the streaming application (and the nc window, for that matter), execute a termination command (for example, Ctrl + C).