Recall that a transformation takes an existing RDD and transforms it into one or more output RDDs. It is also a lazy process that is not initiated until an action is executed. In the following join example, the action is the take() function:
# Flights data
# e.g. (u'JFK', u'01010900')
flt = flights.map(lambda c: (c[3], c[0]))
# Airports data
# e.g. (u'JFK', u'NY')
air = airports.map(lambda c: (c[3], c[1]))
# Execute inner join between RDDs
flt.join(air).take(5)
# Output
[(u'JFK', (u'01010900', u'NY')),
(u'JFK', (u'01011200', u'NY')),
(u'JFK', (u'01011900', u'NY')),
(u'JFK', (u'01011700', u'NY')),
(u'JFK', (u'01010800', u'NY'))]
To better understand what is happening when running this join, let's review the Spark UI. Every Spark session launches a web-based UI, which is, by default, on port 4040, for example, http://localhost:4040. It includes the following information:
- A list of scheduler stages and tasks
- A summary of RDD sizes and memory usage
- Environmental information
- Information about the running executors
For more information, please refer to the Apache Spark Monitoring documentation page at https://spark.apache.org/docs/latest/monitoring.html.
As can be seen in the following DAG visualization, the join statement and two preceding map transformations have a single job (Job 24) that created two stages (Stage 32 and Stage 33):
Let's dive deeper into these two stages:
To better understand the tasks executed in the first stage (Stage 32), we can dive deeper into the stage's DAG Visualization as well as the Event Timeline:
- The two textFile callouts are to extract the two different files (departuredelays.csv and airport-codes-na.txt)
- Once the map functions are complete, to support the join, Spark executes UnionRDD and PairwiseRDD to perform the basics behind the join as part of the union task
In the next stage, the partitionBy and mapPartitions tasks shuffle and re-map the partitions prior to providing the output via the take() function:
For example, if you were to execute the following code snippet, note that the output is a pointer to a Python RDD:
# Same join statement as above but no action operation such as take()
flt = flights.map(lambda c: (c[3], c[0]))
air = airports.map(lambda c: (c[3], c[1]))
flt.join(air)
# Output
Out[32]: PythonRDD[101] at RDD at PythonRDD.scala:50