Chapter 12: Optimizing Cost and Performance with Delta

"You have to perform at a consistently higher level than others. That's the mark of a true professional."

– Joe Paterno, Football Coach

In the previous chapter, we saw how Delta helps with the hardening of data pipelines and making them production worthy. Now that the pipeline is put into action, crunching massive datasets every day, we want that pipeline to be as lean and mean as possible, to extract every ounce of performance from it to make the most of the infrastructure investment while serving the needs of the business users. In addition, the numbers around the end-to-end SLA requirements of pipelines need to be shaved. Both cost and speed are the driving requirements, which is why the key metric is price performance as both aspects need to be optimized.

In this chapter, we will explore the ways that data engineers can keep up with the performance demands at an optimum price point. Data patterns change over time, so tuning activities have to be continuously performed to keep up with them. In particular, we will look at the following topics:

  • Improving performance with common strategies
  • Optimizing with Delta
  • Is cost always inversely proportional to performance?
  • Best practices for managing performance

Let's look at how Delta helps to extract every ounce of performance from your pipeline.

Technical requirements

To follow along with this chapter, make sure you have the code and instructions as detailed on GitHub here:

https://github.com/PacktPublishing/Simplifying-Data-Engineering-and-Analytics-with-Delta/tree/main/Chapter12

https://github.com/delta-io/delta/issues/920 is the proposed roadmap for select edge feature migration from Databricks to open source Delta primarily for performance enhancements.

Let's get started!

Improving performance with common strategies

The performance of a pipeline refers to how quickly the data load can be processed. Throughput is defined as the volume of data that can be processed. In a big data system, both are important scalable metrics. Let's look at ways to improve performance:

  • Increase the level of parallelism: The ability to break a large chunk into smaller independent chunks that can be executed in parallel.
  • Better code: Efficient algorithms and code help to crunch through the same business transformations faster.
  • Workflow that captures task dependencies: Not all tasks can run independently; there are inherent dependencies between tasks and pipelining or orchestration refers to chaining these dependencies as DAGs, where the inherent lineage determines which ones can run simultaneously and which ones need to wait until all the dependent stages have completed successfully. Even better would be the option to share compute for some of these tasks and the ability to repair and run only the affected part of the pipeline in case of a localized failure. Relying on external time schedulers to stitch the task invariably fails as an unexpected spike in data volume or temporary glitch in resource availability can cause the timelines to slide and tasks start to stampede on one another.

Distributed architectures such as Spark can scale both vertically and horizontally to finish the job faster. Vertical refers to larger worker nodes and horizontal refers to a greater number of workers. There is an upper limit to vertical scaling bound by the largest available server node. Although there is no limit to horizontal scaling, the mere addition of parallel worker nodes may not yield better performance because the workload may not be designed with the right partition strategy to take advantage of the load distribution. Spark's cost-based optimizer improves query plans by using table- and column-level statistics. ANALYZE TABLE commands are used to collect these stats.

In the Hadoop world, a partition is the lowest common denominator. So, if a record needs to be modified or deleted, the whole partition is dropped and recreated, which is a very expensive operation. A partition consists of several files and so dealing with file-level granularity allows for faster operation, which is how Delta achieves its fine-grained operations as well as the associated performance benefits. Optimum file size then becomes an important consideration. Too small or too large both have negative ramifications on query performance. Known queries are those whose access pattern is well known and they contribute to dashboards and reports. Providing configuration knobs for easy change to tweak these by use case is an important consideration that Delta provides.

Understanding common patterns and optimizing for those is imperative. This is where optimizations around data layout and the proximity or locality of relevant data that is queried together comes in handy. Interactive exploratory queries are also important but the user is usually a little more patient and can tolerate longer delays as they usually scan much bigger data volumes. Usually, it is a good idea to bucket them into small, medium, and large size and provide SLA expectations on each category.

There are some standard things to look out for when trying to tune a workload:

  • What is the real bottleneck – CPU, memory, or I/O?
  • What horsepower/node type is right for the job?
  • Does it need horizontal or vertical scaling?
  • What is the range of auto-scaling that is prudent?
  • Will it benefit from GPUs?
  • Is there caching?

A pipeline involves several distinct stages and it is important to ensure that they are all performant. These stages include the following:

  • ETL aspects
    • Ingesting data.
    • Processing and transformations to model the data for the use case at hand in the lake.
    • Enriching data for relevant stakeholders.
    • The primary persona involved here is that of a data engineer.
  • Consumption aspects
    • Getting data ready for consumption by BI and AI personas.
    • Ingestion strategies may not match the consumption patterns and this may require different partitioning strategies, file layout, and so on. For example, while designing a pipeline for ingestion, you may be concerned about how quickly to bring in the data, so your partitioning could be by processing date, whereas a consumption pattern may care about the event date. So, it is important to understand what patterns are most common and cater to those first.
    • The primary personas involved here are that of a business analyst or an ML practitioner.

Tuning activities in all these stages are additive and help get the insights in the hands of the stakeholders faster, reducing the analysis time and execution of the actionable insights.

The following diagram makes the distinction between the performance optimization activities in the ETL stage and the additional considerations for the consumption stage. In addition, there are administrative activities that need to be periodically applied to the data to tune file size and generate statistics or get rid of old versions of the data that are no longer necessary. In the next section, we will look at these Delta-specific operations, namely the following:

  • OPTIMIZE helps with file compaction to avoid the small file problem.
  • ANALYZE generates additional runtime statistics metadata that is stored in the meta store that helps during operations such as Adaptive Query Execution (AQE) to modify and refine the query plan on the fly.
  • VACUUM removes data files (not log files) that are no longer referenced by the Delta table and are older than the configured retention threshold, which by default is 7 days.
Figure 12.1 – Performance tuning for ingestion versus consumption patterns

Figure 12.1 – Performance tuning for ingestion versus consumption patterns

In the next section, we will look at where to look for slow queries and what to look for in order to begin the optimization process incrementally.

Where to look and what to look for

The Spark UI offers several views to the underlying query plan so that you can spot trouble-zone bottlenecks and address them appropriately by providing more compute, memory, or I/O. The explain command can be used to review the plan and verify that generated statistics are used to optimize queries. Big data pipelines can suffer from the following syndromes:

  • Spill
    • When the dataset is too large to fit in memory, it will spill to disk, and disk operations are more expensive, which is why spills should be avoided.
  • Shuffle
    • When the data operation requires the movement of data across workers, the internode communication could slow down operations, which is why shuffles should be reduced. Operations such as groupBy, orderBy, and sort will result in a shuffle.
  • Skew/stragglers
    • When the mapping of tasks to cores is not well laid out, it results in the last few tasks running on a few worker nodes while the rest of the nodes are passively waiting, burning compute but not actively participating in the computation.
  • Slow functions
    • These are usually unoptimized user-defined functions that need to be reviewed and optimized. The thread dump of the executors will reveal slow functions.
  • Small files
    • Continuous ingestion of microbatches of data leads to small files, which affects query performance and hence needs coalescing and compaction.

In the next section, we will see what options Delta provides to address some of these common concerns.

Optimizing with Delta

Delta's support for ACID transactions and quality guarantees helps ensure data reliability, thereby reducing superfluous validation steps and shortening the end-to-end time. This involves less downtime and triage cycles. Delta's support of fine-grained updates, deletes, and merges applies at a file level instead of to the entire partition, leading to less data manipulation and faster operations. This also leads to fewer compute resources, leading to cost savings.

Changing the data layout in storage

Optimizing the layout of the data can help speed up query performance, and there are two ways to do so, namely the following:

  • Compaction, also known as bin-packing
    • Here, lots of smaller files are combined into fewer large ones.
    • Depending on how many files are involved, this can be an expensive operation and it is a good idea to run it either during off-peak hours or on a separate cluster from the main pipeline to avoid unnecessary delays to the main job.
    • The operation is idempotent, so running it more frequently does not involve additional compute if there no new data has arrived since.
    • The optimize operation can be done on a table, on a file path, or on a smaller subset of the table by specifying a where clause.
    • The default target file size is 1 GB. However, you can alter that by specifying the value in bytes of two parameters: optimize.minFileSize and optimize.maxFileSize.
    • The default value of the maximum number of threads that run in parallel to run the optimize command can be increased by tweaking optimize.maxThreads.
  • Data skipping
    • This feature is enabled by default and results in metrics collected on the configured (delta.dataSkippingNumIndexedCols) first set of columns of the table, such as min/max values of a column, which helps with faster queries. Long value columns, such as strings, should be placed later to avoid expensive stats being collected for them. The recommendation is to structure tables to move all numerical keys and high-cardinality query predicates to the left and long strings and dates to the right to take advantage of the automatic statistics collection. These stats are kept in the Delta transaction logs. This can be set as a table property as well.

In the next section, we'll examine optimizations offered by managed platforms.

Other platform optimizations

The next few optimizations are provided on the managed platform offering on Databricks; however, some of these make their way into open source as well:

  • ZOrder, also known as multidimensional clustering, is a file layout technique to colocate related information together because the access patterns use those fields heavily in their where clauses:
    • To determine which columns to use for z-order, look for high-cardinality fields that frequently appear in the WHERE clause or are used in the JOIN criteria of queries. Typically, these are identifier fields, such as user ID, IP address, product ID, email address, and phone number. In conjunction with the file-skipping feature, this offers a better selection and orders of magnitude can be shaved off a query on large datasets as the file statistics help to hone in on just the relevant ones. This is useful in almost all large-scale queries, such as cybersecurity analysis involving several terabytes of data to detect intrusion detection patterns (IDPs) with stringent SLAs.
    • For example, date can be a partition field and peril_code can be a zorder field for an insurance claim dataset. 3 or 4 is a good number of columns to consider for z-order if needed, more than that will cause them to be less effective. It is not an idempotent operation; however, if no new data has arrived since the last run, it is a no-op operation. .
    • The following is an example of applying zorder on select columns and potentially select partitions of a Delta table. It can be used in conjunction with partitions, where partition columns are chosen for categorical fields and date fields with lower cardinality so that a substantial number of files (at least 1 GB) sits within a partition to make its creation worthwhile:
  • Auto-optimize: This is particularly relevant for streaming workloads and is similar to optimize in the sense that it collates smaller files, except that it happens in flight before it hits the disk for all write operations. There is an adaptive shuffle that happens just before the write operation, so fewer files need to be written. It can be set either at the table level or for all tables, as shown:

In addition, if the requests are coming in at a high throughput, it is necessary to prevent hotspots. For example, in the context of an object store such as S3, a simple way to do this is to use random prefixes:

  • Dynamic file pruning (DFP): DFP is a data-skipping technique to improve queries with selective joins on non-partition columns on tables in Delta Lake. Static partition pruning is off filters, whereas dynamic is off joins and is a file pruning strategy that kicks in at runtime as opposed to compile-time, hence the name dynamic.

An example of this is a join where one side of the join can use pushdown predicate details to return fewer joins if the other side has far fewer rows and can communicate the same so that the total amount of data brought together for the join is less:

  • Bloom filter index: Each data file can have a single associated Bloom filter index that can help you answer that data is definitively not in the file but there could be cases of false positive. This index file is first checked and the file is read only if the index indicates that the file might match a data filter. It is an uncompressed Parquet file that contains a single row and is stored in the _delta_index subdirectory relative to the data file:
  • Optimum file size: Too-small or too-large files are both bad for performance. If there are too many small files, more time is spent in file I/O operations instead of actually reading data. If the files are too large, they may need to be split, or if they are not splitable, then the workers are not utilized well. It is important to set an optimum file size depending on your use case, as shown:
  • Low Shuffle Merge (LSM): Merge operations can be very expensive on account of the number of rows they touch. LSM improves performance by processing unmodified rows in a separate processing mode, instead of processing them together with the modified rows. So, the amount of shuffled data is reduced significantly, leading to improved performance:
  • Optimize joins: AQE should be enabled by default as it does a lot of the optimizations on behalf of the user. There might be some scenarios where you need to specify hints that both range and join optimizations benefit from. The bin size is a numeric tuning parameter that splits the values domain of the range condition into multiple bins of equal size. For example, with a bin size of 10, the optimization splits the domain into bins that are intervals of length 10.

Data skew is a condition in which a table's data is unevenly distributed among partitions in the cluster and can severely downgrade the performance of queries, especially those with joins. A skew hint must contain at least the name of the relation with skew. A relation is a table, view, or subquery. All joins with this relation then use skew join optimization.

The bin size can be set explicitly or as part of the hint, as shown:

  • Delta caching: It creates copies of the cloud storage data locally on an SSD disk on worker nodes so that subsequent reads are faster and is supported for Parquet/Delta formats. Unlike Spark Cache, this process is fully transparent and does not require any action, and memory is not taken away from other operations within Spark. Eviction is automatically done in least recently used (LRU) fashion or on any file change manually when restarting a cluster:

The following diagram shows the position of Delta cache in an application data stack:

Figure 12.2 – Delta cache

Figure 12.2 – Delta cache

Although the cache need not be explicitly invalidated or loaded, to warm up the cache in advance, the CACHE SELECT command can be used. If the existing cached entries have to be refreshed, the REFRESH TABLE statement can be used, which is lazily evaluated.

  • Optimize data transformations: Higher-order functions for operating on arrays and complex nested data type transformations can be very expensive. Using Spark's built-in functions and primitives is recommended in array manipulation operations.

In the next section, we will look at ways to make this process as automated as possible; REST APIs are the way to go about it.

Automation

For most read/write operations, the regular Spark APIs can be used. However, there are some operations that are specific to Delta that require DeltaTable APIs (refer to https://docs.delta.io/latest/api/python/index.html):

The DeltaTable class is the main one for interacting with the underlying data. All the data operations that we've talked about, such as update, delete, and merge, can be executed using methods on this class. You can use it to convert Parquet to Delta or generate a manifest file so that processing engines other than Spark can read Delta.

Is cost always inversely proportional to performance?

Typically, higher performance is associated with higher costs. Spark provides options for tunable performance and cost. At a high level, it is a given that if your end-to-end latency is stringent or low, then your cost will be higher.

But using Delta to unify all your workloads on a single platform brings efficiencies of scale through automation and standardization, leading to cost reductions by reducing the number of hops and processing steps, which translates to a reduction in compute power. Also, when your queries run faster on the same hardware, you pay for a shorter duration of your running cloud computing cost. So yes, it is possible to improve performance and still contain the cost. SLA requirements are not compromised. Instead, superior architecture options are available, such as the unification of batch and streaming workloads, handling both schema enforcement alongside schema evolution, and the ability to handle unstructured data right alongside traditional structured and semi-structured data. The simplification of pipelines, along with increased reliability and quality, leads to fewer outages and triages/fixes freeing up people from support tasks to work primarily on improving use case effectiveness.

It is important that when you are evaluating product architectures, you look for cost performance metrics. The cost of individual services can be misleading, so the cost of the entire pipeline should be considered for an effective comparison. Duration, failure rates, the ability to support current and future use cases, including critical workloads and their SLAs, and the engineering effort to build and maintain the pipelines should all be considered in the mix. At the end of the day, the only thing that matters is what value is delivered from your data investment. Also, remember that not every pipeline needs to be as fast as possible; it should match the consumption pattern. If there is no one to consume data at the other end, then it is wasteful to refresh it too frequently.

The areas to consider include the following:

  • Ease of setup
  • Ease of use
  • Ease of extensibility and integrations
  • Ease of migration to another location
  • Ease of moving from development mode to production
  • Ease of moving from low-frequency to high-frequency scheduling
  • Ability to support both data and ML pipelines along with REST APIs for CI/CD automation and observability

The metrics to consider can be summarized as follows:

  • Total cost of running the entire pipeline
  • Time duration from end to end to get the data ready and the access patterns to consume it
  • Productivity gains for the engineering and business teams
  • Extensibility of the architecture to future-proof for unknown use cases
  • Complexity involved in the creation and maintenance
  • Portability of the solution so you do not sign up for an expensive vendor lock-in situation

In the next section, we will summarize some of the best practices to get the best performance.

Best practices for managing performance

Managing cost and performance is a continuous activity. Sometimes, they can inversely affect each other, and other times, they go hand in hand. Once optimized, a workload pattern can change and need a different set of tweaks. That said, managed platforms, such as Databricks, are getting better at analyzing workloads and suggesting optimizations or directly applying them, thereby relieving the data engineer from these responsibilities. But there is still a long way to go to reach complete auto-pilot. We covered a lot of different techniques to tune your workloads; partition pruning and I/O pruning are the main ones:

  • Partition pruning: It is file-based by having directories for each partition value. On-disk, it will look like <partition_key>=<partition_value> and a set of associated Parquet data files. If the amount of data pulled from executors back to the driver is large, use spark.driver.maxResultSize to increase it. It may also suffer from too many files, which could slow down the writing operation. Use optimize and auto-optimize to optimize writes and auto-compaction. You should examine the Delta log table.
  • I/O pruning: Data skipping and ZOrdering help with the better management of the granularity of file size and avoiding skews in data file size.
  • Others: Choose cluster configuration judiciously. Add a TTL policy or disable cloud storage versioning to avoid a lot of storage and set spark.sql.shuffle.partitions to the number of executor cores. Use Delta caching to locally cache data and avoid going to cloud storage. This is especially useful in search applications where there is high query reuse or small changes in filter conditions.

Here are some common best practices to follow:

  • Employ the right compute type.
  • Optimize based on the need of the workload under consideration - storage versus compute versus memory.
  • Once a family type is chosen, start with the lowest configuration and understand bottlenecks before throwing larger node types.
  • Depending on SLAs, choose a reserved instance for the driver and a combination of reserved and spot instances for workers as spot instances can greatly reduce cost.
  • Benchmark to size the cluster but allow for auto-scaling (both up and down).
  • In most scenarios, these setting should be applied:
    • Turn on AQE (spark.sql.adaptive.enabled).
    • Turn on coalesce partitions (spark.sql.adaptive.coalescePartitions.enabled).
    • Turn on skew join (spark.sql.adaptive.skewJoin.enabled).
    • Turn on local shuffle reader (spark.sql.adaptive.localShuffleReader.enabled).
    • Set the broadcast join threshold (spark.sql.autoBroadcastJoinThreshold).
    • Avoid SortMerge join (spark.sql.joi.preferSortMergeJoin = false).
  • Build pipelines that capture dependencies of all associated tasks so that downstream tasks are auto-triggered when relevant, dependent upstream tasks are complete.
  • Review and refine code so that it is as lean and mean as possible. Parallelize wherever possible. Look for the five Ss in the Spark UI and other monitoring dashboards and address them early on. Set up alerts and notifications when a job takes too long or fails.
  • Build all pipelines to not just use Delta but streaming as well – the dial can easily be changed to move from less frequent to more frequent to continuous streaming ingestion mode.
  • Plan to execute scheduled maintenance tasks, such as optimize and vacuum.

Here are some tips for making sound decisions around your data layout and operations to maintain data hygiene:

  • What partitioning strategy should you use?

Which column(s) to use for partitioning is a common design consideration. If the table is small to modest in size, it may be best to not have a partition at all. However, if you do partition, you should take into consideration access patterns and the where columns commonly used. For example, date is a common partition strategy. The rule of thumb is to not use a high-cardinality column, such as identifiers. However, if the data in a partition is expected to be about 1 GB, then it is a fair candidate.

  • How do you compact files?

Writing data in microbatches over time creates small files that will adversely affect performance. Compacting using repartition or optimize will help change the data layout and coalesce the data into fewer files. In both cases, you could do it for the entire table, by partition. In the case of repartition, the older files will need to be removed by running a vacuum command.

  • How do you replace content or a schema gracefully?

There may be instances where business logic changes and you need to replace a Delta table. You may be tempted to delete the directory and recreate it or create a new one. This is not a good idea as it not only is slower to delete files but also runs the risk of data corruption. A better approach is to use the overwrite mode to replace data with the overwriteSchema option set to true or fix a few values using Delta's inherent capability for fine-grained updates and deletes.

  • What kind of caching should you use?

When using Delta, it is advisable to not use Spark caching as any additional data filters will cause it to go back to disk, as will accessing using a different identifier. Delta caching where possible at the VM level should be employed as the dataset itself is stored locally. As the data in cloud storage changes, the cache gets invalidated so you do not have to worry about accessing stale data.

The primary performance levers are listed in the following diagram:

Figure 12.3 – Performance levers

Figure 12.3 – Performance levers

We have explored performance from several dimensions, including cost. The best way is to write queries, examine the query plan, optimize, explore, and learn iteratively.

Summary

As data grows exponentially over time, query performance is an important ask from all stakeholders. Delta is based on the columnar Parquet format, which is highly compressible, consuming less storage and memory and automatically creating and maintaining indices on data. Data skipping helps with getting faster access to data and is achieved by maintaining file statistics so that only the relevant files are read, avoiding full scans. Delta caching improves the performance of common queries that repeat. optimize compacts smaller files and zorder colocates relevant details that are usually queried together, leading to fewer file reads.

The Delta architecture pattern has empowered data engineers not only by simplifying a lot of their daily activities but also by also improving the query performance for data analysts who consume the hard work and output produced by these upstream data engineers. In this chapter, we looked at some common techniques to apply to our Delta tables to make them perform better for data analyst queries. In the next chater, we will look at admin-related functionalities, particularly around user management and data access control, so the governance pillar will be the final feather in the cap in the data journey, along with some data migration plays.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset