Chapter 5: Data Layouts

Data analysis is a common practice to make data-driven decisions to accelerate business and grow your company, organization, teams, and more. In a typical analysis process, queries that process and aggregate records in your datasets will be run for your data to understand their business trends. The queries are commonly run from Business Intelligence (BI) dashboard tools, web applications, automated tools, and more. Then, you will be able to get the results you need such as user subscriptions, marketing reports, sales trends, and more.

For their analytic queries, it’s important to consider analytic query performance because they need to timely utilize the analysis data and to quickly make a business decision for their business growth. To accelerate the query performance to quickly obtain the analysis data, you need to care about your dashboard tools, computation engine that processes the large amount of your data, data layout design of your data and its data storage, and more. The combination of these resources affects your analytic query performance so that it’s important to understand them.

This chapter focuses on how we design data layouts to optimize your analytic workloads. In particular, to design the data layouts that can maximize your query performance, we need to consider the three important parts such as key techniques for our data to optimize query performance, how we manage our files, and how we optimize our Amazon S3 storage.

By focusing on these three parts, in this chapter, we will learn useful and general techniques to accelerate your analytic workloads, and important functionalities to optimize the workloads that can be achieved using AWS Glue and Lake Formation.

In this chapter, we will cover the following topics:

  • Why do we need to pay attention to data layout?
  • Key techniques to optimally storing data
  • Optimizing the number of files and each file size
  • Optimizing your storage by working with Amazon S3

Technical requirements

For this chapter, if you wish to follow some of the walk-throughs, you will require the following:

  • Access to GitHub, S3, and the AWS console (specifically AWS Glue, AWS Lake Formation, and Amazon S3)
  • A computer with the Chrome, Firefox, Safari, or Microsoft Edge browser installed and the AWS Command-Line Interface (AWS CLI):
  • An AWS account and an accompanying IAM user (or IAM role) with sufficient privileges to complete this chapter’s activities. We recommend using a minimally scoped IAM policy to avoid unnecessary usage and making operational mistakes. You can get the IAM policy for this chapter from the relevant GitHub repository, which is shown at https://github.com/PacktPublishing/Serverless-ETL-and-Analytics-with-AWS-Glue/blob/main/Chapter05/data.json. This IAM policy includes the following access:
    • Permissions to create a list of IAM roles and policies for creating a service role for an AWS Glue ETL job
    • Permissions to read, list, and write access to an Amazon S3 bucket
    • Permissions to read and write access to Glue Data Catalog databases, tables, and partitions
    • Permissions to read and write access to Glue Studio
  • An S3 bucket for reading and writing data with AWS Glue. If you haven’t created one yet, you can do so from the AWS console (https://s3.console.aws.amazon.com/s3/home) | Create bucket. You can also create a bucket by running the aws s3api create-bucket --bucket <your_bucket_name> --region us-east-1 AWS CLI command.

Why do we need to pay attention to data layout?

As we discussed earlier, it’s important to maximize query performance for your analytic workloads because they need to quickly understand for their situation for quick decisions based on the query results. To achieve the most optimal analytics workloads, one of the most important phases is data extraction process that a computation engine retrieves your data from the data location (Relational database, Distributed storage and so on) and reads records. It’s because many operations on our analytic workloads are reading data and processing them into what we want based on our running queries. These days, many computation engines that process data are effectively optimized their computation by their community, company and more. However, the data extraction process, especially retrieving and reading data from an external location highly depends on our data layout such as the file number, file format and so on, network speed, and more. Therefore, to achieve optimal data extraction, we should carefully design our data layout to optimize our query performance more.

When considering the data layout, you should mainly focus on the following three parts:

  • Key techniques to optimally storing data: This is the first part. When you store your data, you should pay attention to what file format and compression type you use, and whether you use partitioning and/or bucketing. Because these techniques are import to optimize your query performance. We’ll go through the details about the techniques in Key techniques to optimally storing data section. Paying attention to how you store data can optimize the processing of your data with a processor engine that actually runs analytic queries such as saving process time to compute data schema by choosing a file format. This has a schema, avoiding processing unnecessary files by filtering your data in advance, and more.
  • Optimizing the number of files and each file size: This is the second part. It’s possible to save processing time by keeping the number of files as small as possible and by keeping each file size the number which is a computation engine’s chunk size such as 64MB, 128MB and so on. This is because we can potentially avoid spending time of handling each file by the computation engine.
  • Optimizing data storage based on data access: This is the last one. Your data size should be incremental and grow continuously, such as continuous web access logs, data sent by IoT devices, and more. Generally, the larger the data size in your storage, the higher the cost of the storage usage you need to pay. Therefore, often you need to archive part of the data and keep other parts based on the access to the data to decrease the storage cost and reduce unnecessary data access for your analytic workloads.

To achieve data retrieval as quickly as possible, and then enhance your analytic workloads, in the next section, we will focus on learning about the previously mentioned points to introduce a good data layout. In particular, this chapter will show how you can meet these requirements with AWS Glue, AWS Lake Formation, and Amazon S3.

Key techniques to optimally storing data

As mentioned earlier, the data extraction process is one of the most important phases to consider when optimizing your analytic workloads. In the usual process of data retrieval, users such as data analysts, business intelligence engineers, and data engineers run queries to a distributed analytics engine such as Apache Spark and Trino. Then, the distributed analytics engine gets information about the data, such as each file location and metadata. Usually, this kind of data is stored in distributed storage such as Amazon S3, HDFS, and more. After getting all the information about the data, the computing engine actually accesses and reads the data that you specify in the queries. Finally, it returns query results to the users.

To make the data retrieval process faster for further analysis, it’s important to consider how you store data. In particular, you can optimize workloads for analysis by storing data in the most suitable condition for your analysis. For example, when running analytic queries, if there were a lot of files in your storage, running queries would take more time than if there are a smaller number of files. This is mainly because a distributed analytics engine would need time to get the information about each file, such as each file location and metadata. Based on the information, the computing engine retrieves the data from storage before processing it. In such cases, it’s possible to improve the time of the data retrieval process by gathering the files within a smaller number of files and decreasing each file size by compressing it to match the size that the computing engine can process (usually, this size is based on your computing engine’s memory capacity). Usually, this processing can be achieved by using computing engines.

To optimally store your data, you should pay attention to file formats, compression types, the splitability of files, and partitioning or bucketing as they can affect the workloads of your analytic queries. We will learn more about this in the following sections.

Selecting a file format

Generally, data can be categorized into unstructured, semi-structured, and structured formats based on whether the data has a specific schema and types. If the data has specific key-value pairs but doesn’t have any typed schema, the data can be classified into a semi-structured format such as JSON, CSV, or XML. If the data has specific columns and types, it can be classified into a structured format such as Apache Parquet, Apache ORC, Apache Avro, and more. Otherwise, the data can be generally thought of as an unstructured format such as images and log files.

Selecting a file format affects your query performance. Structured format data with a schema like a relational database table enables a data processing engine to avoid computing the data schema and to extract only necessary data (e.g. values in the columns you want to process) based on user defined queries. In particular, it’s recommended that you use the formats that have columnar data structures such as Apache Parquet and Apache ORC because these formats provide a lot of merits for the analysis. For example, Apache Spark that is used on AWS Glue can optimize querying Parquet files by narrowing down access to records based on Parquet format structure. We’ll see the merits next, and see how to convert your data to these columnar formats.

Storing your data in columnar formats for effective analytic workloads

As we’ve seen so far, Apache Parquet and Apache ORC are file formats that have table-like schemas and columnar storage. These formats can effectively provide data processing for your analytic queries based on their columnar format features such as metadata columns, filtering columns and the relevant records, effective compression and encoding schemas, and more.

Actual data in Parquet files consists of row groups, which include arrays of columns. Parquet defines the size of a chunk of the data for each column to store records, which includes columns and pages as Block size. By default, this size is defined as 128 MB. Also, ORC has a chunk size to store records called Stripe size, which is defined as 64 MB by default. Each chunk in ORC includes index data, row data, and, strip footer. If you store data with a large block or strip size, a processor can execute effective column-based manipulations; however, this is possible to cause multiple I/O operations due to multiple blocks in your storage. On the other hand, if you store data with a small block or strip size, this too needs multiple accesses to each file and possibly reduces its efficiency. Therefore, when you store your data with the Parquet or ORC format, you should store data with the block or stripe size or set a larger block or stripe size based on your data if your data has a lot of columns.

Configuration of Parquet block or ORC stripe size in Glue Spark jobs

You can configure the block or strip size by specifying each relevant parameter to the option method for Spark DataFrameWriter as follows:

dataframe.write.option('parquet.block.size', 1024 * 1024) # 1024 * 1024 bytes = 1MB block size

dataframe.write.option('orc.stripe.size', 1024 * 1024) # 1MB strip size

You can also effectively narrow down your data for Parquet and ORC formats when filtering or querying values in particular columns. Many computation engines such as Apache Spark, Apache Hive and Trino/Presto support a narrow-down feature called predicate pushdown or filter pushdown. Each block in Parquet and ORC files has statistics of the chunk such as the value range of minimum and maximum. This statistical information is used for your running query to determine which part is necessary to read. If you sort the column value that you use for filtering before processing the data, this can improve your analytic query performance based on its mechanism.

Converting your data to Apache Parquet or Apache ORC formats with AWS Glue

You can convert your data files with a Glue ETL Spark job. Using AWS Glue Studio, you can create the Glue job and it automatically generates the format conversion script. Regarding how to use the Glue Studio, please refer to AWS Glue ETL and AWS Glue Studio section in Chapter 4, Data Preparation. The following example shows the steps to generate the format conversion script from JSON to Apache Parquet with snappy compression. Follow these steps:

  1. Download the sample sales data (data.json) on your local machine from https://github.com/PacktPublishing/Serverless-ETL-and-Analytics-with-AWS-Glue/blob/main/Chapter05/data.json. Once downloading is completed, upload the file to your Amazon S3 bucket using the command; aws s3 cp data.json s3://<your-bucket-and-path>/ or from the S3 console (https://s3.console.aws.amazon.com/s3/buckets)
  2. Access Jobs on Glue Studio console (https://us-east-1.console.aws.amazon.com/gluestudio/home#/jobs).
  3. Choose Visual with a source and target and Create on the top of right page.
  4. In Data source – S3 bucket node, set your S3 bucket and path, and choose Infer schema.
  5. In Data target – S3 bucket node, set Parquet to Format, Snappy to Compression Type and your S3 bucket and path. You can generate the following diagram and script as the following screenshot.
Figure 5.1 – Format conversion Glue job diagram and script on Glue Studio console

Figure 5.1 – Format conversion Glue job diagram and script on Glue Studio console

  1. To run this file format conversion job, choose Job details tab and complete all information such as Job name, IAM Role, Job type and so on.
  2. After completing all information, choose Save and Run.

After running the Glue job completed, you can see parquet files with snappy compression in the target S3 bucket and path.

Next, we’ll look at several data compression types that can decrease your data size.

Compressing your data

Reducing file size by compression enables you to save data network transfer cost, save query process time, reduce usage of data storage, save the storage cost and so on. For these merits, you should store data with compression. Note that you pay attention to whether the compression type is splittable or not, compression or decompression speed and each compressed file size, which possibly affect your query performance. We will see the file splittability in the Splittable or Unsplittable files section and see the file size management in the Managing number of files and each file size section.

The following table shows, in Spark, compression formats that are commonly used for Apache Parquet such as gzip, lz4, snappy, and zstd, along with their compression ratios and compression/decompression speeds. Each compression ratio and (de)compression speed is measured by running actual data processing jobs, in seconds. Additionally, each of them is normalized by each no compression result and gzip compression result, respectively:

Table 5.1 – Comparison of compression ratio and speed between compression types

Table 5.1 – Comparison of compression ratio and speed between compression types

Each value in the table was measured by running a Glue Spark job. The following list shows what environment the Spark job ran on:

As shown in Table 5.1, compressing the data with gzip, lz4, snappy, and zstd can reduce the file size compared to the case without compression. In addition to reducing file size by the compression technique, compression/decompression speed can affect your processing job. In particular, a data processing job, including gzip compression, is expected to be slower than a job using the other compression types such as lz4, snappy, and zstd, based on Table 5.1. Therefore, when compressing your data with a processing job to optimize the data in your storage, you should consider not only the compression ratio but also the compression speed to get compressed data as quickly as possible.

Note

Generally, the higher the compression ratio of an algorithm you specify, the more computation overhead is necessary to compress and decompress data.

So far, we’ve seen how the compression works for your data and workloads. But how can we actually run the compression job for our data? We can compress our data with AWS Glue. Using Glue Studio, we can generate the compression Glue job script as we’ve seen in Converting your data to Apache Parquet or Apache ORC formats with AWS Glue section. Specifically, we just choose a compression type for the Data target – S3 bucket node in Step 5 of the example in the previous section. The compression type you can choose depends on your file format type. For example, if you set Parquet as the format, you can choose Snappy, LZO, GZIP or Uncompressed. The following example script shows the partial code that is generated by Glue Studio and that writes the Parquet files with GZIP compression.

S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMapping_node2,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://your-target-bucket-and-path/",
        "partitionKeys": [],
    },
    format_options={"compression": "gzip"},
    transformation_ctx="S3bucket_node3",
)

You can also compress your data with Spark DataFrame. If you use Spark DataFrame for compression, you need to directly edit your Glue job script on Glue Studio. The following example shows the part of the job script that writes Parquet files with zstd compression.

COMPRESSION_CODEC = 'zstd'
dataframe.write
         .option('compression', COMPRESSION_CODEC)
         .parquet(DST_S3_PATH)

The whole script is available at https://github.com/PacktPublishing/Serverless-ETL-and-Analytics-with-AWS-Glue/blob/main/Chapter05/compression_by_dataframe.py.

Note

Glue DynamicFrame currently doesn’t support zstd for reading and writing. You should use Spark DataFrame to compress/decompress data to/from zstd.

Next, we’ll look at file splittability, which is determined by file format and compression type.

Splittable or unsplittable files

When you run analytics queries and process data, it’s helpful to know whether the files from your data source are splittable or not. A file is splittable means whether a processor such as AWS Glue can get the contents of a file by separating it based on the chunk size of the processor when the processor reads the file. When a file is not splittable, a processor cannot separate a file and needs to get the whole file. 

Why do we need to think about whether a file is splittable? Well, usually, it affects your data retrieval. Let’s assume that your data files are not splittable and each file has a big size that is greater than the size of your memory or storage. A file is not splittable; therefore, a processor cannot separate it as a chunk and needs to read the whole file. However, a processor cannot process a file because each file size is more than the memory and storage size or processor. In particular, with Apache Spark, processing a large size of an unsplittable file might cause an out-of-memory error because Spark processes the data in memory. In other words, you should control each file size appropriately for your processor if your data source has unsplittable files.

Whether it’s splittable or not depends on what the file format is and/or how the file has been compressed. The following table shows popular file formats and compression types, and whether they’re splittable or unsplittable. Please check the files in your data source if you use a data processor such as AWS Glue, Amazon EMR, or Amazon Athena, which processes and writes the data in your storage:

Table 5.2 – The splittability of file formats and compression types

Table 5.2 – The splittability of file formats and compression types

From Table 5.2, for example, if your data files are in XML format without compression, they’re splittable. As another example, if your data files are in JSON format with gzip compression, they’re unsplittable.

Partitioning

Partitioning is a technique to store your data separately into different folders based on specified partition keys. Each partition key is related to your data and actually acts as a column. For example, if you have your data in your Amazon S3 bucket as s3://bucket-name/category=drink/data.json, the partition key can be recognized as category, and its value is drink.

By partitioning your data, you can reduce data scan size by querying only the required data. Specifically, a computation engine (such as Spark, Presto and so on.) only reads the data in specified partition keys and values in your query. In the example above, if you specify drink for the category partition key, the engine only reads the data under the drink folder by listing partition values for the key. This can reduce data scan size and improve query performance.

You can define a column as a partition key at table creation. The partition keys and values are registered in your table that is stored in Apache Hive metastore. The Hive Metastore is a service to store table metadata and their relevant information, in a database backend such as a relational database. More details about the Hive Metastore is discussed in the AWS Glue Data Catalog section in Chapter 2, Introduction to Important AWS Glue Features. The computing engine retrieves the list of partition values from the metastore based on your query with a specific range of partition values for keys, and then it reads the data in specified partitions. Therefore, partitioning enables a computation engine to filter partitions and avoid processing unnecessary partitions.

When you partition your data, you should use Hive style partitioning such as /path/to/<partition_key_1>=<value1>/<partition_key_2>=<value2> compared to non key-value style such as /path/to/value1/value2. Using Hive style partitioning, partition keys can be processed as table columns and the values are filtered by WHERE clause in SQL-like query such as WHERE category = 'drink'. Also, you can automatically register partition values in your Hive Metastore for the key by MSCK REPAIR TABLE <your-table-name> Hive query that can be run by not only Glue Spark jobs but also Athena. For more details about Hive query for Hive style partitioned tables, please refer to https://docs.aws.amazon.com/athena/latest/ug/partitions.html.

The Glue Data Catalog, which we saw in Chapter 2, Introduction to Important AWS Glue Features, can be used as an external Hive metastore. You can register partition keys and values in your table in the Glue Data Catalog. For example, you can register category as a partition key and drink as its value in a Glue Data Catalog table on your S3 bucket structure such as s3://bucket-name/category=drink/<data files>. We look at how to register partition keys and values in the Glue Data Catalog in Registering partition values in a Glue Data Catalog table section below. By specifying a range of partitions, you can reduce the data scan size in your Glue ETL Spark job because the job only reads the data in the specified partitions. This possibly improves the Glue job performance.

Example - partitioning by AWS Glue ETL Spark job

In this example, we partition the data (data.json) in the S3 bucket with Hive style partitioning by a Glue Spark job. Specifically, we partition the S3 bucket as the following structure based on the data.json records. In the following folder structure, the partition key is category, and the values are drink, grocery and kitchen.

s3://bucket-name/
          ├── category=drink/<data files>
          ├── category=grocery/<data files>
          ├── category=kitchen/<data files>

To write your data with Hive style partitioning by a Glue job, you can mainly use partitionKeys option for Glue DynamicFrame or partitionBy method for Spark DataFrame.

As we’ve seen in Converting your data to Apache Parquet or Apache ORC formats with AWS Glue section, using Glue Studio, we can automatically generate a partitioning script by specifying partition keys for the Data target – S3 bucket node. In the following screenshot, category is specified as the partition key.

Figure 5.2 – Specifying partition key for Data target node

Figure 5.2 – Specifying partition key for Data target node

The following script is the partial code that is generated by Glue Studio based on this diagram. This script writes snappy compressed Parquet files with hive style partitioning as category=<partition_value>.

S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMapping_node2,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://your-target-bucket-and-path/",
        "partitionKeys": ["category"],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="S3bucket_node3",
)

If you use Spark DataFrame for partitioning, you need to directly edit your Glue job script on Glue Studio. The following example shows the part of the job script that writes snappy compressed Parquet files with category based partitioning.

dataframe.write
.partitionBy('category')
.parquet(DST_S3_PATH). # The default compression type is snappy.

The whole script is available at https://github.com/PacktPublishing/Serverless-ETL-and-Analytics-with-AWS-Glue/blob/main/Chapter05/partitioning_by_dataframe.py

Best practice to select partition keys

Please note that the number of partitions when you select partition keys for your data. The more number of partitions in a table increases, the higher the overhead of processing the partition metadata. Therefore, you should choose a low-cardinality column as a partition key. Also, note that avoid choosing a partition key that has many skewed values to lower the overhead of filtering values. Usually we use year, month, day, category, region and so on a partition key.

If you create a table in the Glue Data Catalog based on your data by the Glue Crawler, Athena DDLs and so on., you can define columns as partition keys in your table registered in Glue Data Catalog. Glue Data Catalog that we’ve seen in Chapter2, Introduction to Important AWS Glue Features supports partitioning columns.

The following output of AWS CLI get-table command shows a table metadata that is created based on the example dataset. You can see columns and the category partition key as follows.

$ aws glue get-table --database-name db_name --name product_sales
{
    "Table": {
        "Name": "product_sales",
        "DatabaseName": "db_name",
        ...
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "product_name",
                    "Type": "string"
                },
                {
                    "Name": "price",
                    "Type": "long"
                },
                ...
        },
        "PartitionKeys": [
            {
                "Name": "category",
                "Type": "string"
            }
        ],
...

To identify each partition column value for data retrieval by AWS Glue, Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum you need to register the values of the partition key in your Glue Data Catalog table.

Registering partition values in AWS Glue Data Catalog

Primarily, there are four ways to reflect those partition column values in the Glue Data Catalog:

  • Glue DynamicFrame: Adding partitions by Glue ETL jobs. An example of the script is shown at https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html#update-from-job-partitions.
  • Spark DataFrame: Running saveAsTable with partitionBy such as the following example:

    # PySpark example

    your_data_frame.write

                .mode('overwrite')

                .partitionBy('<partition_column>')

                .option('path', 's3://your-bucket/path/')

                .saveAsTable("db.table")

The preceding example registers a table that has a partition column such as <partition_column> in the Glue Data Catalog. It also writes the data to Amazon S3. The data is written into the s3 path, which is concatenated s3://your-bucket/path/ with the pair of our specified partition column and its value, such as s3://your-bucket/path/<partition_column>=<value>/.

Using the first two ways, you can write the data and add partition values to your Glue Data Catalog simultaneously. The other operations simply help in adding the partition values to the Glue Data Catalog. Therefore, you can operate these two operations after writing your data with the partitioning.

Partition Pruning AWS Glue

If you use the Glue DynamicFrame to read data from partitioned tables in the Glue Data Catalog, you can use data filtering queries that enable your Glue Spark job to avoid processing unnecessary partitions for your analysis. The DynamicFrame supports the following two types of data filtering queries:

  • Predicate pushdown: This enables your Glue Spark job to filter partitions. This happens on the client (Spark job) side. This works as the following steps:
    1. The Glue job firstly retrieves all partitions that are registered in the Glue Data Catalog, and it keeps them as a partitions list.
    2. The job filters the partitions in the list based on the specified predicate pushdown query
    3. The job reads the data located in the filtered partitions in Step 2.
  • Catalog-side predicate pushdown: This is also a query to prune partitions as well as the predicate pushdown, however the pruning partitions happens on the sever (the Glue Data Catalog) side. This works as the following steps:
    1. The Glue job requests the specified partitions registered in a table to the Glue Data Catalog.
    2. The partitions list as a result of filtering on the server (Glue Data Catalog) side is returned returns the list to the job based on the request in Step 1
    3. The job reads the data located in the specified partitions.

These predicate pushdowns contribute to making data retrieval faster compared to retrieving all data in your storage by the processing job.

You can operate predicate pushdown as mentioned by specifying the push_down_predicate option in DynamicFrame. You can also use this with SparkSQL by specifying partitions in the WHERE clause. In the following example, the DynamicFrame only reads the data in the partition whose category is grocery by setting category=='grocery' to push_down_predicate option.

# PySpark example of a pushdown predicate
glue_context.create_dynamic_frame.from_catalog(
    database="db_name",
    table_name="product_sales",
    push_down_predicate="category==grocery")

Also, you can operate catalog-side predicate pushdown by specifying catalogPartitionPredicate in a DynamicFrame. Please note that partition indexes in AWS Glue, which we’ll see next needs to be enabled to use the catalog partition predicate. In the following example , the Glue DynamicFrame reads the data at the partition which is category == book.

# PySpark example of a catalog partition predicate
glue_context.create_dynamic_frame.from_catalog(
    database="db_name", 
    table_name="product_sales",
    additional_options={"catalogPartitionPredicate":"category=='grocery'"})     

As discussed earlier, the catalog-side predicate pushdown partition prunes partitions on the Glue Data Catalog side instead of on processing job side. Catalog-side predicate pushdown can be much faster than using predicate pushdown on the job side if there are a lot of partitions such as over millions of partitions in your Amazon S3 bucket.

Running queries faster with partition indexes

Partition indexes (https://docs.aws.amazon.com/glue/latest/dg/partition-indexes.html) in AWS Glue is one of the functionalities in the Glue Data Catalog. This enables to reduce the query time to filter partitions in the Glue Data Catalog tables. Partition filtering works on the Glue Data Catalog side, instead of returning all partitions to a requester. Once you set the partition indexes to your table that has partitions, a requester (typically, a Glue job) only retrieves necessary partitions that you requested. If the partition index is not enabled, all partitions in the Glue Data Catalog table are returned to a requester and then the requester needs to choose partitions that you want to query. Using partition indexes can increase query performance and save costs such as requests to the Glue Data Catalog table.

Bucketing

Bucketing is a technique that is used to divide data into sub-data and to group rows based on one or more specified columns. Also, this can reduce your processed data by filtering any unnecessary data rows based on the bucketing information if you specify the bucketed columns in your queries. Bucketing can improve your query performance and then accelerate your analytic workloads, too. 

You can also specify a bucketed column at table creation. When you set a column as the bucketed column, you should choose with high cardinality and that can be used often for filtering the data. The Glue Data Catalog supports bucketing. If you specify bucketing at table creation, then the bucketing columns are defined in the StorageDescriptor part of the Data Catalog. On the other hand, when Spark writes the data with bucketing, Spark adds the Spark format, which describes the bucketing information as parameters of the Data Catalog.

To write your data with bucketing on S3 with Glue ETL Spark jobs, you can mainly use the bucketBy method for a Spark DataFrame. MurmurHash (https://en.wikipedia.org/wiki/MurmurHash) is used in Spark and Glue by default. Please note that Glue DynamicFrameWriter doesn’t support writing with bucketing in the writing process. For example, you can write the data using bucketing such as the product sales table that based on data.json by following examples of using a DataFrame. In this example, you need to pass the bucketed number and one or more columns to the bucketBy method:

# PySpark example of setting the bucketed number to 10 and column to 'customer_id'
your_data_frame.write
.bucketBy(10, 'customer_id')
.parquet('s3://<your-bucket>/<path>/')

There are primarily two ways to reflect the bucket column values in the Glue Data Catalog for Glue ETL jobs to identify the columns as bucketed columns in the data retrieval phase:

By using saveAsTable in a Spark DataFrame, you can write data with bucketing and add the bucketing information to your Glue Data Catalog simultaneously. The other option requires creating a new table and adding the bucketing information to the Data Catalog at the time of the new table creation.

Note

If you are creating a table using bucketing with Athena DDL, you can see the Athena DDL syntax at https://docs.aws.amazon.com/athena/latest/ug/create-table.html. In addition to the DDL, Athena CTAS can also be operated to define and register the bucketing information. An example of the CTAS query, including a definition of bucketing, is shown at https://docs.aws.amazon.com/athena/latest/ug/ctas-examples.html#ctas-example-bucketed.

We’ve seen how we store data optimally, focusing on topics such as file formats, compression types, file splitability, and partitioning/bucketing. Next, we’ll see the second topic, Managing the number of files and each file size, which you need to consider for optimizing your analytic queries.

Optimizing the number of files and each file size

The number of files and each file size are also related to the performance of your analytic workloads. In particular, the number of files and file sizes are related to the performance of the data retrieval phase by using an analytic engine in your analytic workloads. To understand the relationship between the number of files and the file size and the performance of the data retrieval process by an analytic engine, we’ll look at how the engine generally retrieves data and returns the result as follows.

The basic process of data retrieval and returning a result is firstly getting a list of files, reading each file, processing the contents of the files based on your queries, and then returning the result. In particular, when processing data in Amazon S3, the analytic engine lists objects in your specified S3 bucket, gets objects, reads the contents, then processes and returns the result. When you use an AWS Glue ETL Spark job to process your data in the S3, in the data retrieval process, the Spark driver in the Glue job lists objects in the S3 bucket, then Spark Executors on the Glue job get objects based on the result listed by Spark Driver.

Therefore, the greater the number of files in your storage, the longer listing takes. In addition to this, if your data source is based on a lot of small files, it also takes longer to process data across multiple files because it needs more file I/O compared to the file I/O for a smaller number of files. Therefore, managing the number of files and file sizes is important for your data retrieval process by the analytic engine.

What is compaction?

We store various types of logs such as web access logs, application logs, and IoT device logs in storage such as Amazon S3. These logs are delivered by applications and devices continuously and periodically (in a relatively short period, from seconds to minutes). Furthermore, these logs often consist of a small file in the size of kilobytes or a few megabytes. Therefore, as the logs are delivered into your storage, the number of small files in your storage increases. Usually, this can cause a situation where there are a lot of small files in your storage, such as there being 100 million files and each file size being 1 KB.

If you directly run your analytic workloads for data that consists of a lot of small files, it’s expected that the query time would increase because listing files in the data retrieval phase by an analytic engine takes a lot of time. Therefore, when running a processing job, you need to transform a lot of small-file data into data with the appropriate number of files, as well as the size of each file. This action to merge small files into larger ones and arrange the data is called compaction. Compaction is a necessary process to relax the a lot of small files problem, which increases query time and affects your analytic workloads. The following table shows the performance comparison of record count by a Spark DataFrame between non-compacted data and compacted data:

Table 5.3 – Comparison of the speed of record count by a Spark DataFrame between non-compacted data and compacted data (this speed is measured by seconds)

Table 5.3 – Comparison of the speed of record count by a Spark DataFrame between non-compacted data and compacted data (this speed is measured by seconds)

As you can see in the preceding table, counting records of compacted data is about 66 times faster than that of non-compacted data. Based on the result, we can see that compaction greatly contributes to increasing query performance if the compacted and non-compacted data have the same size.

In the following sections, we’ll see how you can run compaction on your data with AWS Glue. AWS Glue provides flexible solutions to run compaction and basic compaction steps using Spark. In addition to Glue, you can also use the AWS Lake Formation automatic compaction functionality, which automatically runs compaction on your specified data. Additionally, we’ll learn about Lake Formation’s automatic compaction.

Compaction with AWS Glue ETL Spark jobs

You can process your data, merge the files, and store the data in columnar format using Glue ETL Spark jobs to optimize your analytic workloads. To build an automatic compaction, you essentially need to consider the following two key things in the compaction process:

  • How you determine the number of files after the compaction process?
  • How you control each file size through the compaction process?

You can control the number of output files in Glue ETL Spark job. Additionally, you can manage each file size by controlling the number of files when Glue job writes the data in your storage such as Amazon S3, and by specifying the file format and compression.

Essentially, Spark determines the number of output files based on the number of Spark partitions, which determines the amount of concurrency of processing data. The number of partitions is determined by input splits, such as data splitted size in EMRFS is defined as fs.s3.block.size, HDFS block size, and more. Additionally, the number is determined by the operations on your data in Spark such as spark.sql.shuffle.partitions/spark.default.parallelism, which defines the number of partitions after shuffling operations. 

In a Glue job, by setting the number of partitions just before writing data with Spark, your Glue job writes the data with the same number of files as the number of partitions specified. You can control the number of partitions using the repartition(<number>) or coalesce(<number>) methods for a Glue DynamicFrame or Spark DataFrame. Please note that there is currently no option to specify the output file size in Spark when writing data. Therefore, to control the number of files and each file size by Spark, you need to control the number of partitions in your Spark application (Glue job).

The following steps show an example of compaction process by a Glue job:

  1. Check the total size of input files and the number of files. 
  2. If possible, process a small part of the data with Spark and check the compression ratio of the output file size to the input file size (columnar formats such as Parquet and ORC are good as output file formats for analytic workloads).
  3. Based on the compression ratio and each file size, compute and set the number of output partitions. It’s good to start by setting 64 or 128 MB to efficiently process data with a Glue job.
  4. Update the number of partitions by repartition() or coalesce()_ method based on your input file size.

The compaction sample script is provided by AWS in the AWS provided GitHub’s repository (https://github.com/awslabs/aws-glue-blueprint-libs/blob/master/samples/compaction/compaction.py). The compaction process in this script roughly works as follows.

  1. Spark partition number and size is calculated by listing objects in a specified S3 folder in get_partition_num_and_size method.
  2. If partition size control option (enable_size_control) is set to true, based on the calculated partition number and size, optimal file number per partition (optimal_file_num) is calculated.
  3. The partition number is updated by coalesce() method with the calculated optimal file number. Then write the number of files.

Automatic Compaction with AWS Lake Formation acceleration

The Lake Formation acceleration feature automatically runs compaction on your data. This compaction is a background process and doesn’t affect your analytic workloads. You don’t have to implement a compaction Glue ETL job that reads your data and merges and compresses the data into a new one. To enable this feature, you need to create a table whose table type has GOVERNED. You can create a GOVERNED status table by checking the Enable governed data access and management box from Create Table in Tables in the Lake Formation console navigation pane, as shown in the following screenshot. After checking it, Automatic compaction will automatically be turned on. Once the GOVERNED status for a table has been enabled, Lake Formation starts monitoring your data and runs compaction jobs internally without interfering with concurrent queries:

Figure 5.3 – Enabling governed status for a table

Figure 5.3 – Enabling governed status for a table

At the time of writing, this compaction feature is supported only for partitioned tables in the Parquet format. Next, we’ll look at how to optimize our data layout with Amazon S3 functionalities.

Optimizing your storage with Amazon S3

So far, we’ve seen how we should store data optimally and how we can manage data to optimize data retrieval and accelerate the analytic workloads. The techniques primarily work on the data itself, such as storing data with columnar formats, data compaction, and more. Not only does it handle data itself optimally, but it’s also important to think about optimization on the storage side. 

Our data, such as logs of web access, device data, and so on, is continuously reported, and that data size grows over time. As the storage usage increases, the cost increases, too. To reduce the cost of storage usage, usually, we archive data that is not frequently or ever accessed. Generally, we can divide data into the following tiers based on the frequency of access to it:

  • Hot: This is data that you usually access.
  • Warm: This is data that you have relatively less access to or require less than hot data.
  • Cold: This is data that you infrequently access or almost do not require.

Based on the three preceding tiers, usually, we select machines and configure replication policies. 

Amazon S3 provides more flexible storage options that you can select. By selecting suitable options for your data and archiving your data effectively, you can reduce not only the storage cost but also the data retrieval time. In this section, we’ll look at the S3 storage plans, the data life cycle that S3 also provides, and the way to archive or delete your unnecessary or infrequently accessed data with AWS Glue.

Selecting suitable S3 storage classes for your data

You can see the storage classes that S3 provides and the main usage of each storage class in the table under the Comparing the Amazon S3 storage classes section of the AWS documentation (https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html). Based on your data usage and access patterns, you should select a suitable class for your data. If you process the data with AWS Glue, Glue has options to exclude specific class objects and also has methods to change a storage class of objects. We’ll see the options and methods in the Excluding S3 storage classes, archiving, and deleting objects with AWS Glue section.

Using S3 Lifecycle for managing object lifecycles

S3 Lifecycle runs automatic actions on your objects to manage objects in your storage based on your lifecycle configurations. You can set the lifecycle using the Management tab in your bucket view.

Firstly, you need to set the scope of automatic actions, such as Limit the scope of this rule using one or more filters (filter-based action) or Apply to all objects in the bucket (applying to all objects action), from Choose a rule scope in the following screenshot. If you select filter-based actions, you can set the filtering condition, such as Prefix or Object tags, as follows:

Figure 5.4 – The condition of automatic lifecycle actions

Figure 5.4 – The condition of automatic lifecycle actions

Then, you define the actual lifecycle actions on your objects with which the lifecycle configuration is applied. There are two types of provided actions:

  • Transition actions: These are defined when objects move to another storage class. You can set the number of days after which to move an object to other storage classes such as STANDARD-IA class after an object is put on Amazon S3. If you have old data that you never use or infrequently access, such as data that has passed 30 days since the data creation, you should consider setting this action. By setting this action that moves old objects into archival storage classes, such as STANDARD-IA, you can decrease the storage cost of Amazon S3.
  • Expiration actions: These are defined when objects expire or are deleted. You can set the number of days after which to expire or delete an object after the object is put on Amazon S3. If you have old data that was created some years ago, and you don’t need to access the data, you can remove that data by setting this action. By removing unnecessary data, you can decrease not only the storage usage but also the cost of storage usage.

You can choose one or more rules, such as changing a current storage class or removing objects from the list, on the page shown in the following screenshot. The first two actions are transition actions, while the others are expiration actions:

Figure 5.5 – The list of lifecycle actions

Figure 5.5 – The list of lifecycle actions

Please note that life cycle configurations are applied to not only new objects but all existing objects once you set the configuration. For more details about the S3 Lifecycle, please refer to https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html.

By setting S3 Lifecycle rules, we can manage the data lifecycle. In particular, there are two actions that you can configure for your Amazon S3 bucket. These actions are Transition, which changes the data storage class, and Expiration, which expires or deletes the data. These actions are triggered days after the object’s creation was set. Therefore, the S3 Lifecycle automatically archives your data and runs garbage collection without implementing custom code.

Next, we’ll look at the functionalities of Glue for skipping data with a specific storage class, transitioning a storage class of your data, and deleting your data using Glue ETL jobs.

Excluding S3 storage classes, archiving, and deleting objects with AWS Glue

AWS Glue provides functionalities that are combined with S3 storage classes, and it can delete unnecessary objects. In particular, we’ll see the following functionalities that Glue provides regarding archiving and deleting data:

  • Excluding S3 storage classes: AWS Glue ETL jobs can process data across multiple storage classes excluding specific storage classes.
  • Transition of a storage class: Transition a storage class of files in the specified S3 path or that is pointed to by the database and table in the Glue Data Catalog.
  • Purge objects: Delete files in the specified S3 path or that are pointed to by the database and table in the Glue Data Catalog.

Now, let’s take a look at them in detail.

Excluding S3 storage classes with the excludeStorageClasses option

You can filter the S3 storage classes in your AWS Glue ETL jobs to avoid failing to read data in specific classes such as GLACIER and DEEP_ARCHIVE. In particular, you can filter them by passing the excludeStorageClasses option to a DynamicFrame when creating it. For more details, please refer to https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-storage-classes.html#aws-glue-programming-etl-storage-classes-dynamic-frame.

Transitioning a storage class with the transition_s3_path or transition_table method

You can transition your file storage class to another class. When you want to archive specific partitions after running compaction on the data in the partitions, you can use this method and archive files in the partitions with partitionPredicate.

Here’s a simple script demonstrating how to run the transition_table method to transition objects in the specific partition (month=5). After running the script, all objects in the month=5 partition are transitioned to the Glacier storage class:

# PySpark script to transition objects in the month=5 partition to GLACIER immediately.
glue_context.transition_table(
     database='db_name',
     table='table',
     transition_to='GLACIER'
     options={
      'retentionPeriod': 0,
       'partitionPredicate': '(month==5)'})

You can filter objects by not only partition predicates but also retention periods. For more details about transition operations in Glue, please refer to https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-transition_table.

Deleting objects with the purge_s3_path or purge_table method

You can delete your files from Glue ETL jobs with the purge_s3_path or purge_table method. When you want to delete objects in a specific partition after running compaction on the data in the partition, you can use this method and delete the files in the partition with partitionPredicate. Additionally, you can remove partition values from the Glue Data Catalog.

Here’s a simple script demonstrating how to run the purge_table method to delete objects from the specific partition (month=5) and also delete the partition value from the Glue Data Catalog. After running the script, all objects in the month=5 partition are deleted and the partition value registered in Data Catalog is also deleted:

# PySpark script to delete objects in the month=5 partition immediately.
glue_context.purge_table(
    database='db_name',
    table_name='purge_table',
    options={
        'partitionPredicate': '(month==5)', 
        'retentionPeriod': 0})

You can filter objects by not only partition predicates but also retention periods. For more details about purge operations in Glue, please refer to https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_table.

Summary

In this chapter, we learned how to design the data layout to accelerate our analytic workloads. In particular, we learned about it by focusing on three parts, including how we store our data optimally, how we manage the number of files and each file size, and how we optimize our storage by working with Amazon S3.

In the first part, we learned techniques to store our data optimally. These techniques include choosing file formats and compression types, understanding file splitability, and partitioning/bucketing. Then, we learned about data compaction to manage the number of files and each file size and to enhance analytic query performance. In the last part, we learned how to optimize our storage with Amazon S3 and Glue DynamicFrames. You can effectively use your storage by archiving, expiring, and deleting your data with Amazon S3 Lifecycle configurations and the Glue DynamicFrame methods.

Managing the data in your data lake with techniques introduced in this chapter will solve a lot of problems such as slow queries, analytic costs, storage costs, and more. In Chapter 6, Data Management, we’ll see how we can manage data to match various use cases by diving into what kind of analysis we can do and who conducts the analysis by running queries.

Further reading

To learn more about what we’ve touched on in this chapter, please refer to the following resources:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset