Chapter 6. Loading and Saving Your Data

In chapters 4 and 5, we discussed internal (built-in) and external Data Sources: how Spark reads from and writes to these sources. While in chapter 4 we explored the internal Data Sources (see section “Data Sources for DataFrames and SQL Tables), we did not cover how to organize data while writing onto a disk.

In this chapter, we discuss strategies to organize data such as bucketing and partitioning data for storage, compression schemes, splittable and non-splittable files, and Parquet files.

Both engineers and data scientists will find parts of this chapter useful, as they evaluate what storage format is best suited for downstream consumption for future Spark jobs using the saved data.

Motivation for Data Sources

Spark’s ability to interact with many data sources—internal and external—extends its functionality to the larger Hadoop ecosystem. Built upon the low-level InputFormat and OutputFormat interfaces used by Hadoop MapReduce, Spark’s high-level Data Source V2 APIs1 can connect to these data sources. For example, Spark can access storage systems such as S3, Azure Blob, HDFS, NoSQL, etc., giving Spark and its developers immense flexibility to access myriad data sources for data analytics.

Spark supports three sets of data sources:

  • File formats and filesystems

  • Structured data sources

  • Databases and NoSQL (key/value stores)

We covered some structured data sources in chapter 4 and common databases and key/value data sources in Chapter 5. However, for brevity, we did not cover text files, and how to segment some files such as Parquet for efficient layout and faster access.

File Formats: Revisited

Spark makes it simple to load and save data in a different number of file formats. So let’s examine these file formats.

Text Files

Text files are simple to load and save with Spark—simple because each line in the text file becomes a record in a DataFrame or Dataset of type String, even though a text line may contain other data types, for example, integer, float, etc; and simple because Spark is relieved from inferring a schema or you from providing one. However, the different data types within each line if present will be read as part of a single string record.

Server logs are good examples of text files, with different data types separated by a delimiter, normally a tab or space, and so are plain text files such as README.txt.

Reading Text Files

Reading is simple. Use an instance of a SparkSession and DataFrameReader to read a text file into a DataFrame in Python or Dataset in Scala.

In Python:

lines_df = spark.read.text("/databricks-datasets/SPARK_README.md")
lines_df.show(n = 10, truncate = False)

In Scala:
val linesDF = spark.read.text("/databricks-datasets/SPARK_README.md")
linesDF.show(10, false)

+------------------------------------------------------------------------------+
|value                                                                         |
+------------------------------------------------------------------------------+
|# Apache Spark                                                                |
|                                                                              |
|Spark is a fast and general cluster computing system for Big Data. It provides|
|high-level APIs in Scala, Java, Python, and R, and an optimized engine that   |
|supports general computation graphs for data analysis. It also supports a     |
|rich set of higher-level tools including Spark SQL for SQL and DataFrames,    |
|MLlib for machine learning, GraphX for graph processing,                      |
|and Spark Streaming for stream processing.                                    |
|                                                                              |
|<http://spark.apache.org/>                                                    |
+------------------------------------------------------------------------------+
only showing top 10 rows

Once loaded into a DataFrame or Dataset, you can transform the lines of text. For example, let’s filter out all lines but the ones with the word Spark in them.

In Scala:

linesDF.filter($"value".contains("Spark")).show(5, false)

In Python:

lines_df.filter(col("value").contains("Spark")).show(5, truncate=False)

+------------------------------------------------------------------------------+
|value                                                                         |
+------------------------------------------------------------------------------+
|# Apache Spark                                                                |
|Spark is a fast and general cluster computing system for Big Data. It provides|
|rich set of higher-level tools including Spark SQL for SQL and DataFrames,    |
|and Spark Streaming for stream processing.                                    |
|You can find the latest Spark documentation, including a programming          |
+------------------------------------------------------------------------------+
only showing top 5 rows

A common non-trivial use case of processing text files with Spark is analysing server log files—for example, an Apache log file. Consider a short sample of an Apache log file, fakely generated here 2. (See the example Scala and Python notebooks in the Learning Spark 2nd Repo, Chapter 7).

161.239.128.119 - - [15/Jul/2019:06:44:31 -0700] "DELETE /apps/cart.jsp?appID=3419 HTTP/1.0" 200 4976 "http://baker-kennedy.org/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/5340 (KHTML, like Gecko) Chrome/13.0.849.0 Safari/5340"
44.137.30.10 - - [15/Jul/2019:06:47:22 -0700] "GET /search/tag/list HTTP/1.0" 200 5033 "http://wilson.net/blog/explore/posts/privacy/" "Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10_5_9; rv:1.9.6.20) Gecko/2016-06-05 13:21:30 Firefox/3.6.13"
172.55.193.211 - - [15/Jul/2019:06:48:45 -0700] "PUT /search/tag/list HTTP/1.0" 200 5100 "http://thomas.com/search/categories/about/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_3; rv:1.9.4.20) Gecko/2014-07-11 21:43:54 Firefox/3.8"
...
...

Even though each line in the Apache log comprises semi-structured data of different types—string, timestamp, integer, long, etc.—each line is read as an entire string. You’ll have to parse the line and extract its respective data types for further analytics. Even better create a DataFrame or Dataset as a collection of records comprised of these extracted data types.

The Scala code below converts a Dataset[String] to Dataset[ApacheLogRecord]. 

val accessLogPath = ...        // path to your Apache log file
val apacheLogDS spark.read.text(accessLogPath).as[String]

// create a case class that will represent a record for each line
case class ApacheLogRecord(ip:String, client: String, user:String, date:String, cmd:String, request:String, proto:String, status:Int, bytes:Long, ref:String, userAgent:String)

// define a regular expression to parse the log line
val pattern = """^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) (S+) "(S+)" "([^"]*)"""".r

// create a function to parse log lines
def parseLogLine(line: String) = {
  val pattern (ip, client, user, date, cmd, request, proto, status, bytes, referrer, userAgent) = line
  ApacheLogRecord(ip, client, user, date, cmd, request, proto, status.toInt, bytes.toLong, referrer, userAgent)
}

// parse and convert each line into ApacheLogRecord
val apacheRecordsDS = apacheLogDS.map(line => parseLogLine(line))
apacheRecordsDS.drop(“client”, “user”).show(3, false)

------------------------------------------------------------------------------
|ip             |date                      |cmd   |request                  |proto   |status|bytes|referer                                          |userAgent                                                                                                        |
+---------------+--------------------------+------+-------------------------+--------+------+-----+-------|161.239.128.119|15/Jul/2019:06:44:31 -0700|DELETE|/apps/cart.jsp?appID=3419|HTTP/1.0|200   |4976 |http://baker-kennedy.org/                    |Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/5340 (KHTML, like Gecko) Chrome/13.0.849.0 Safari/5340|
|44.137.30.10   |15/Jul/2019:06:47:22 -0700|GET   |/search/tag/list         |HTTP/1.0|200   |5033 |http://wilson.net/blog/explore/posts/privacy/|Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10_5_9; rv:1.9.6.20) Gecko/2016-06-05 13:21:30 Firefox/3.6.13            |
|172.55.193.211 |15/Jul/2019:06:48:45 -0700|PUT   |/search/tag/list         |HTTP/1.0|200   |5100 |http://thomas.com/search/categories/about/   |Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_3; rv:1.9.4.20) Gecko/2014-07-11 21:43:54 Firefox/3.8                |
+---------------+--------------------------+------+-------------------------+--------+------+-----+-------
only showing top 3 rows

With our new Dataset[ApacheLogRecord], we can conduct a number of Spark operations on them. For example, we could issue the following queries:

  • What is the most common url accessed?

  • What is the most common referrer?

  • What are the most common response codes?

  • What are the largest urls in bytes served?

(For Python code of these queries, we refer you to the Python notebook in the Learning Spark, 2nd GitHub, Chapter 7. 3)

Query 1 and 2:

import org.apache.spark.sql.functions.desc

apacheRecordsDS.select("request", "referrer")
.groupBy("request","referrer")
.count()
.orderBy(desc("count"))
.show(10, false)

+--------------------+---------------------+-----+
|request             |referrer             |count|
+--------------------+---------------------+-----+
|/app/main/posts     |http://smith.com/    |19   |
|/wp-admin           |http://smith.com/    |16   |
|/search/tag/list    |http://smith.com/    |15   |
|/posts/posts/explore|http://www.smith.com/|13   |
|/explore            |http://www.brown.com/|13   |
|/posts/posts/explore|http://johnson.com/  |12   |
|/wp-content         |http://smith.com/    |12   |
|/wp-content         |http://www.smith.com/|12   |
|/explore            |http://www.smith.com/|11   |
|/explore            |http://johnson.com/  |11   |
+--------------------+---------------------+-----+
only showing top 10 rows

Query 3:

import org.apache.spark.sql.functions.desc

apacheRecordsDS.select("status")
.groupBy("status").count()
.orderBy(desc("count"))
.show(10, false)

+------+-----+
|status|count|
+------+-----+
|200   |90079|
|404   |4029 |
|301   |3891 |
|500   |2001 |
+------+-----+

Query 4:
import org.apache.spark.sql.functions.desc

apacheRecordsDS.select("request", "bytes")
.orderBy(desc("bytes"))
.show(5, false)

+-------------------------+-----+
|request                  |bytes|
+-------------------------+-----+
|/apps/cart.jsp?appID=3881|5228 |
|/app/main/posts          |5208 |
|/explore                 |5205 |
|/explore                 |5200 |
|/app/main/posts          |5199 |
+-------------------------+-----+
only showing top 5 rows

Now that we have a cleansed Dataset[ApacheLogRecord] what can we do with it?

Most data engineers will want to preserve this transformed data for future analysis. We suggest a couple of options: One is to create a managed or unmanaged SQL table (see section “Creating SQL Databases and Tables,” chapter 4) for future Spark applications to query.

Another option is to save it as a Parquet file. Even though you can save as a Text file, it defeats the purpose for this use case, especially after you have extracted and parsed lines of Apache log text into its respective data types. We suggest saving it in a Parquet data format, an efficient columnar storage format, which we discuss below in the “Saving as Parquet Files” section.

Writing Text Files

Saving text files is just as simple as reading text files. In our first example above, we can save the filtered text README.txt after filtering out all but Spark occurrences in the file.

In Scala:

linesDF.filter($"value".contains("Spark"))
    .write.text(“/storage/SPARK_README.md”)

In Python:

lines_df.filter(col("value").contains("Spark"))
.write.text(“/storage/SPARK_README.md”)

The write operation will result in multiple parts of files under the directory /storage:

_SUCCESS
_committed_447813250458265774
_started_447813250458265774
part-00000-tid-447813250458265774-4b1e09fc-eba0-4e08-b476-e2f6f4b429cc-624-1-c000.txt

Depending on the size of the SPARK_README.md and Spark configuration spark.sql.maxParitionionBytes , multiple part-*.txt files will be created, and they all can be read back at once with spark.read.text(“/storage/part-*.txt”).

Note

The spark.read.text(“/logs/web-apache-*.log”) will expand wildcards and read all files matching that pattern. In cases where a directory may contain multiple server log files created during the day, you can use pattern matching to load all at once.

Organizing Data for Efficient I/O

Spark offers data engineers two efficient ways to organize data for storage so that future Spark jobs can operate in parallel and read only relevant data rather than read the entire dataset—that is, avoid a full-table or full-file scan for certain query operations such as select or filter.

Two organizing strategies achieve this I/O efficiency: partitioning files and bucketing or clustering data by columns names in the DataFrame or Dataset. For those who are familiar with Apache Hive, the concept is similar (but not equivalent) to Hive partitions, where data is organized under named directories corresponding to named columns in the Hive metastore.

Partitioning

By design to achieve maximum parallelism, Spark Executors allocate each core to a Spark Task to read each partition of a file, as shown in Figure 6-1. No two Executor’s core’s assigned Tasks will read the same data from the same partition. That is, each core with an assigned Spark Task reads a separate or distinct partition. Hence, more partitions lead to more parallelism in Spark. (In chapter 8, we will discuss strategies to maximize partitions based on how many cores are available on the Spark Executors.)

Cores to Tasks to data partitioning relationships on Spark Executors
Figure 6-1. Cores to Tasks to data partitioning relationships on Spark Executors

When saving data files, you can control how many partitions to create from your DataFrame or Dataset. In our case for Apache log Dataset[ApacheLogRecord], the following code snippet will create five partitions.

In Scala:

apacheRecordsDS.repartition(5)
.write.format("parquet")
.mode(“overwrite”)
.save("/tmp/web-1-apache.logs.parquet")

The code will create five partitions of about equal size in the directory folder “/tmp/web-1-apache.logs.parquet”. For brevity, we only list the partition files and sizes. By default, Parquet employs an efficient compression algorithm called snappy.

part-00000-tid-971498833024559610-4babb082-1726-4d53-9d1d-e097fffdc4a7-648-1-c000.snappy.parquet 1231810
part-00001-tid-971498833024559610-4babb082-1726-4d53-9d1d-e097fffdc4a7-649-1-c000.snappy.parquet 1233549
part-00002-tid-971498833024559610-4babb082-1726-4d53-9d1d-e097fffdc4a7-650-1-c000.snappy.parquet 1230315
part-00003-tid-971498833024559610-4babb082-1726-4d53-9d1d-e097fffdc4a7-651-1-c000.snappy.parquet 1230700
part-00004-tid-971498833024559610-4babb082-1726-4d53-9d1d-e097fffdc4a7-652-1-c000.snappy.parquet 1232347

Another way to control partitioning is by organizing and storing data by certain common field names. That is, you can control the folder directory names and structures. For example, you can partition data by a particular column name or field in your data if that field is frequently accessed. To illustrate, let’s partition our Apache log Dataset[ApacheLogRecord] by “date” and “ip” since we often may want to query records for “ip” for a given “date.”

apacheRecordsDS.
  .write
  .format("parquet")
  .mode("overwrite")
  .partitionBy("date","ip")
  .save("/tmp/web-1-apache.date-ip.logs.parquet")

This will generate folders date=<distinct dates> and a sub folder ip=<distinct ip addresses>under date, with respective partitions under each folder. When selecting or filtering date in your query, Spark will look only in directories with that date partition, hence avoiding excessive reads.

Let’s look at what they look like when we list the directory "/tmp/web-1-apache.date-ip.logs.parquet":

date=15Jul/2019:06:31-0700/
date=15/Jul/2019:06:47:22-0700/
...

Under a folder date=15/Jul/2019:06:31-0700/ip=161.239.128.119, it’ll have its partitions.

part-00000-tid-7724829523200203126-2ced3143-ca6e-4d11-93b6-b5bd2e623be9-717-1.c000.snappy.parquet 3814

The number 3814, next to the file, is the size in bytes of each partition file. By default, the maximum size of a Spark partition is 128MB, but you can change the default through spark.conf.set(“spark.sql.files.maxPartitionBytes”, <new value>). (We will cover these configurations in the next chapter.)

By creating partitions with multiple column names enables Spark to quickly filter queries against data associated only with specific date with a specific ip address, skipping the rest of the directories. These kinds of partition-by-named columns allow partition pruning and data skipping techniques that make Spark queries against these partitioned data performant.4

In Chapter 8, Optimizing, Tuning and Debugging Apache Spark, we’ll reexamine configuration aspects of these optimization tips and techniques.

Bucketing

Supported only for Spark managed tables, this scheme of data layout pre-shuffles and optionally pre-sorts the data while creating a Spark managed table. Data within each bucket is grouped or clustered into its physical partition. This is extremely efficient when you are doing large joins and aggregates since similar data will be grouped or sorted in a single partition on a single node, reducing the amount of data to be shuffled across or between many nodes 5. In other words, we reduce the amount of wide transformations, shuffles or exchanges (see “Narrow and Wide Transformations” section in chapter 2) needed during a groupBy or sortBy operation.

In our case for Query 1 & 2 above, we can bucket the results of that query as a managed Spark table with the following code:

apacheRecordsDS.select("request", "referrer")
  .groupBy("request","referrer")
  .count()
  .orderBy(desc("count"))
  .write.format("parquet")
  .bucketBy(5, "referrer","request", "count")
  .saveAsTable("apache_log_referrer_tbl")

Since tables in Spark are really files on disk and associated metadata created under /user/hive/warehouse, the buckets with same tid (buckets are assigned tids) are created under the directory “/user/hive/warehouse/apache_log_referrer_tbl/”

part-00000-tid-7839129643710671878-5ee66a1e-9b12-4c86-b8f7-7f9bcb845ab0-2667-1_00000.c000.snappy.parquet 2350
part-00000-tid-7839129643710671878-5ee66a1e-9b12-4c86-b8f7-7f9bcb845ab0-2667-2_00001.c000.snappy.parquet 2365
part-00000-tid-7839129643710671878-5ee66a1e-9b12-4c86-b8f7-7f9bcb845ab0-2667-3_00002.c000.snappy.parquet 2316
part-00000-tid-7839129643710671878-5ee66a1e-9b12-4c86-b8f7-7f9bcb845ab0-2667-4_00003.c000.snappy.parquet 2369
part-00000-tid-7839129643710671878-5ee66a1e-9b12-4c86-b8f7-7f9bcb845ab0-2667-5_00004.c000.snappy.parquet 2164

To examine the detailed “bucketed” properties of a bucketed table apache_log_referrer_tbl, you can issue a Spark SQL query such as:

spark.sql("DESCRIBE FORMATTED apache_log_referrer_tbl").show(truncate=False)

+----------------------------+--------------------------------------------------+-------+
|col_name                    |data_type                                         |comment|
+----------------------------+--------------------------------------------------+-------+
|request                     |string                                            |null   |
|referrer                    |string                                            |null   |
|count                       |bigint                                            |null   |
|                            |                                                  |       |
|# Detailed Table Information|                                                  |       |
|Database                    |default                                           |       |
|Table                       |apache_log_referrer_tbl                           |       |
|Owner                       |root                                              |       |
|Created Time                |Tue Jul 23 21:04:10 UTC 2019                      |       |
|Last Access                 |Thu Jan 01 00:00:00 UTC 1970                      |       |
|Created By                  |Spark 2.4.3                                       |       |
|Type                        |MANAGED                                           |       |
|Provider                    |parquet                                           |       |
|Num Buckets                 |5                                                 |       |
|Bucket Columns              |[`referrer`, `request`, `count`]                  |       |
|Sort Columns                |[]                                                |       |
|Table Properties            |[transient_lastDdlTime=1563915850]                |       |
|Location                    |dbfs:/user/hive/warehouse/apache_log_referrer_tbl |       |
|Serde Library               |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|       |
|InputFormat                 |org.apache.hadoop.mapred.SequenceFileInputFormat  |       |
+----------------------------+-------------------------------------------+
only showing top 20 rows

Compression Schemes

An additional strategy to consider, while saving certain type of files for future Spark jobs to read efficiently, is to select an appropriate compression algorithm. Two factors you should consider in choosing any compression algorithms are 1) compression ratio (higher the better) and 2) compression and decompression speed (faster the better).

By choosing an output compression algorithm, you can have a big impact on reading efficiency for future consumers of the data. Table 7-1 lists some of the compression algorithms and their properties.

Format Splittable Average compression speed Effectiveness on text files Native
Gzip * No Fast High Yes
lzo Yes Very Fast Medium Yes
bzip2 Yes Slow Very High Yes
zlib No Slow Medium Yes
snappy Yes Very Fast Low Yes

* Only for Parquet files this format is splittable.

Splittable and Non-splittable Files

While choosing a compression scheme, you want to ensure that compressed files are splittable into partitions. By now you aware that Spark maximizes parallelism with each Spark Task and a partition per Spark Executor core. So if you have a huge file, say larger than I GB that you have compressed using zlib or gzip, then downstream only a single Spark Task will be assigned to read a single non-splittable compressed file, while the rest of the Executors’ cores will remain idle. Since this scheme affects parallelism, ensure to use a compression algorithms that can generate splittable compressed file, unless you want to create or split files yourself.

Columnar Formats

Finally, compressed files stored in columnar formats such as ORC or Parquet are most efficient for big data analytics. These formats are optimized for big data analytics since they support nested data types, allow data skipping and column pruning, and store min/max statistics for predicate push-down.

Saving as Parquet Files

Apache Spark favors its output format as Parquet and is the best choice for Spark SQL. We encourage that you employ Parquet format to save all your files after you have conducted your extract, transformation, and load (ETL).

Aside from faster compression, splittable files, and columnar storage that facilitate column format for efficient retrieval, Parquet is the de facto storage of the Delta Lake open-source storage format to build reliable data lakes. Delta Lakes bring ACID (atomicity, consistency, isolation, and durability) properties to Apache Spark.6

Delta Lake Storage Format

Delta Lake storage format leverages efficient compression schemes that are native to Parquet. As such you can easily take your existing Parquet files and convert them into a Delta Lake format—all works quite easily with Spark APIs.

Delta Lake Table

To convert an existing Parquet file into a Delta Lake format is simple:

In Python:

apache_logs_df = spark.read.format(“parquet”).load(“/tmp/web-1-apache.logs.parquet”)
apache_logs_df.write.format(“delta”).save(“/tmp/web-1-apache.delta”)

In Scala:

val apacheLogDF = spark.read.format(“parquet”).load(“/tmp/web-1-apache.logs.parquet”)
apacheLogDF.write.format(“delta”).save(“/tmp/web-1-apache.delta”)

Now that you have converted your Parquet file format into Delta Lake format, you can just as easily create a Delta Lake Table:

spark.sql(“DROP TABLE IF EXISTS apache_logs_delta_tbl”)

spark.sql(“CREATE TABLE apache_logs_delta_tbl USING delta LOCATION ‘/tmp/web-1-apache.delta’)

Just as you can query any Spark managed or unmanaged table, so you can query the Delta Lake table with Spark SQL—just think of it as another Data Source for Spark API to connect to, except with Delta Lake tables, the operations adhere to ACID properties.

Some example queries to the Delta Lake table using Spark SQL:

// show selected details for the Delta Lake table
spark.sql("describe detail apache_logs_delta_tbl")
        .select(“format”, “name”, “location”, “createdAt”)
.show(4, false)
+------+-----------------------------+----------------------------+-----------------------+
|format|name                         |location                    |createdAt              |
+------+-----------------------------+----------------------------+-----------------------+
|delta |default.apache_logs_delta_tbl|dbfs:/tmp/web-1-apache.delta|2019-07-18 00:23:37.558|
+------+-----------------------------+----------------------------+-----------------------+

// query the Delta Lake table 
spark.sql("select status, count(*) from apache_logs_delta_tbl group by status order by status desc").show(5, false)

+------+--------+
|status|count(1)|
+------+--------+
|500   |2001    |
|404   |4029    |
|301   |3891    |
|200   |90079   |
+------+--------+

While this short section is only a peek into Delta Lake, in chapter 12 we’ll explore in detail Delta Lake’s and Apache Spark’s potential to build reliable data lakes and examine Delta Lake features.

Summary

This chapter covered additional Data Sources—Text, Parquet, and Delta Lake—that we did not cover in chapter 4. In particular, we explored how to efficiently save files so that downstream Spark jobs can quickly read them.

We looked at data organization and directory layout schemes such as partitioning and bucketing, discussed the reasons for choosing the right compression algorithm, and why columnar format, in particular Parquet, is best suited for big data analytics. While partitioning of files can maximize parallelism for future Spark jobs, bucketing can reduce (or remove) shuffles during large table joins on bucketed and sorted columns.

Finally, we peeked at an open-source Delta Lake storage format layer, which we will cover in chapter 12, and how Delta Lake seamlessly works with Apache Spark APIs, bringing ACID properties to Spark.

In the next chapter, we’ll look at how to optimize Spark by tuning Spark configurations and what attributes and signals to look for while debugging Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset