Chapter 6: Solving Common Data Pattern Scenarios with Delta

"Without changing our pattern of thought, we will not be able to solve the problems we created with our current pattern of thoughts"

– Albert Einstein

In the previous chapters, we established the foundation of Delta and how it helps to consolidate disparate datasets, and how it offers a wide array of tools to slice and dice data using unified processing and storage APIs. We examined basic Create, Retrieve, Update, Delete (CRUD) operations using Delta and time travel capabilities to rewind to a different view of data at a previous point in time for rollback capabilities. We used Delta to showcase functionality around fine-grained updates and deletes to data and the handling of late-arriving data. It may arise on account of a technical glitch upstream or a human error. We demonstrated the ability to adapt to changing schema. This happens when the underlying application is upgraded and new fields are introduced, as a few of the old ones may have changed, or maybe one or two of them have been dropped because they are no longer considered useful. Delta simplifies all these operations and now we will leverage them to build bigger reusable patterns.

There are common data operations that we have been addressing in different ways for a long time, but with every tech advancement, we adopt a different approach to solving the same problems to squeeze in an extra ounce of performance and efficiency. In this chapter, we will create reusable code building blocks to tackle everyday data processing needs so that these modular patterns can be applied in a wide variety of use cases across different industry verticals over and over again. In a small application, or while doing a Proof of Concept (POC), it is, of course, very easy to rip and replace. But for a large-scale enterprise data pipeline, it has to be done very carefully as the ramifications are widespread. The analogy is that of maneuvering a speed boat versus a large ship. Moving data is always error-prone, so once it's curated, we will look at strategies to minimize data movement. There are some cases where it is necessary to create a backup or a clone of the curated data in a different region/location and it is important to understand the distinction between these scenarios. Adding data to the lake is always simpler than managing changes to data at scale. In this chapter, we will look into solving some of these problems with Delta, namely the following:

  • Understanding use case requirements
  • Minimizing data movement with Delta time travel
  • Data cloning
  • Handling Change Data Capture (CDC)
  • Handling Slowly Changing Dimensions (SCD)

Technical requirements

To follow the instructions of this chapter, make sure you have the code and instructions as detailed in this GitHub location:

https://github.com/PacktPublishing/Simplifying-Data-Engineering-and-Analytics-with-Delta/tree/main/Chapter06

Examples in this book cover some Databricks-specific features to provide a complete view of capabilities. New features continue to be ported from Databricks to the open source Delta.

Let's get started!

Understanding use case requirements

Each problem that a client brings up will always have some similarities to a problem you may have seen before and yet have some nuances to it that make it a little different. So, before rushing to reuse a solution, you need to understand the requirements and the priorities so that they can be handled in the order of importance that the client values them. A good way to look at requirements is by demarcating the functional ones from the non-functional ones. Functional requirements specify what the system should do, whereas non-functional requirements describe how the system will perform. For example, we may be able to perform fine-grained deletes from the enterprise data lake for a GDPR compliance requirement, but it takes two days and two engineers to do so at the end of each month, so it will not meet the requirements of a 12-hour SLA. The technical capabilities exist, but the solution is still not usable. The following diagram helps you classify the requirements into these two groups as they will influence your choice of trade-offs as you build the solution.

Figure 6.1 – Functional and non-functional requirements

Figure 6.1 – Functional and non-functional requirements

In Chapter 5, Data Consolidation in Delta Lake, we emphasized the need to keep data as a single source of truth (SSOT) and build multiple disparate use cases on a single view of the data; this is because every time you move data, there are plenty of opportunities to make errors and compromise the quality of data, in addition to the additional cost and operational burden. In the next section, we will first look at a few techniques to minimize data movement and also explore scenarios where it is absolutely necessary.

Minimizing data movement with Delta time travel

Apart from ensuring data quality, the other advantage of minimizing data movement is that it reduces the costs associated with data. To prevent fragile disparate systems from being stitched together, the first core requirement is to keep data in an open format for multiple tools of the ecosystem to handle, which is what Delta architectures promote.

There are some scenarios where a data professional needs to make copies of an underlying dataset. For example, to make a series of A/B tests in the context of debugging and integration testing, a data engineer needs a point-in-time reference to a data snapshot to compare for debugging and integration testing purposes. A BI analyst may need to run different reports off the same data to run some audit checks. Similarly, an ML practitioner may need a consistent dataset because experiments have to be compared across different ML model architectures or against different hyperparameter combinations to make a model selection. Before anyone realizes, too many copies have been made and it is difficult to keep track of the original version. Another person decides to collaborate in your efforts and, not wanting to step on your toes, ends up creating yet another set of copies. Creating new datasets means providing access to these datasets and remembering to take them away after the test has been concluded. Few people remember to do that, leading to data exposure situations. Cleaning up these files and their associated tables is dangerous and people shy away from it, which results in all these temporary copies staying forever and causing data governance challenges. In the next few sections, we will see if it is possible to do rollbacks, audit data changes in the last few days, and have a static view of data, even though the underlying table is in constant flux and data is not only getting added but also changing constantly.

Every data operation on a Delta file or table format automatically gets a timestamp and version number. Users can then use 'timestampAsOf' or 'versionAsOf' in Scala, Python, or SQL to look at a snapshot of the data:

  • Python syntax:
  • SQL syntax:

Changes can be audited by looking at the history of the Delta table, as demonstrated here:

DESCRIBE HISTORY <delta table>
Or
DESCRIBE HISTORY delta.'<path to delta data>'

Apart from other metadata details, such as the user, the number of files affected, and the operation performed (such as an insert/update/delete/merge), the syntax highlights the version and timestamp values that we were using in our previous commands. When you select * from a Delta table, it is retrieving data from the latest version. To find the latest version number or timestamp, you can use the following:

sql_s = "SELECT max(version) FROM (DESCRIBE HISTORY <delta table>)"
version = spark.sql(sql_s).collect()

Delta time travel can help fix an inadvertent operational error where some data accidentally got deleted or updated, as shown in the following examples:

The following is an example of merging changes when a condition is matched:

An ML practitioner may collaborate with others to run a series of experiments for two weeks on a given dataset, and they all need to know the version of data to use for their base training, test, or validation datasets. This will help them compare results fairly and justify their choice of the winning model.

A BI analyst is always looking for patterns and trends in data. Temporal data analysis to see how many new sales were made in the last day or week, for instance, can be easily done using the same time travel capabilities of data without requiring the exact day/time values, which makes it possible to automate such workloads.

In the next section, we will look at the cloning functionality of Delta, which has various applications for data engineers and data scientists and analysts alike by giving them the ability to back up and refresh data across environments or reproduce an experiment with fidelity on a copy of the data in a different region. It is important to distinguish between time travel and cloning capabilities. In time travel, you are rewinding to a different version or timestamp of the dataset, whereas in cloning, you are making data available in a different setup or environment.

Delta cloning

Cloning is the process of making a copy. In the previous section, we started out by saying that we should try to minimize data movement and data copies whenever possible because there will always be a lot of effort required to keep things in sync and reconcile data. However, there are some cases where it is inevitable for business requirements. For example, there may be a scenario for data archiving, trying to reproduce an ML flow experiment in a different environment, short-term experimental runs on production data, the need to share data with a different LOB, or maybe the need to tweak a few table properties without affecting original source especially if there are consumers leveraging it with some assumptions.

Shallow cloning refers to copying metadata and deep cloning refers to copying both metadata and data. If shallow cloning suffices, it should be preferred as it is light and inexpensive, whereas deep cloning is a more involved process.

Figure 6.2 – Shallow versus deep cloning in Delta

Figure 6.2 – Shallow versus deep cloning in Delta

The previous figure shows the two paths for copying data and metadata for the two scenarios of shallow and deep cloning between a primary and a secondary datastore. It should be noted that after the clone operation, changes to the secondary do not affect the primary. The following table highlights the differences even further with guidance on when to use each.

Figure 6.3 – Shallow versus deep cloning

You can combine cloning with time travel capabilities to copy a certain snapshot of data. In the next section, we will look at two closely related operations – Change Data Capture (CDC) and Slowly Changing Dimensions (SCD) – one addresses the flow of data from the perspective of the source, the other from the perspective of the target table.

Handling CDC

CDC is a process that identifies the classification of incoming records in real-time to determine which ones are brand new, which ones are modifications of existing data, and which ones are requests for deletes. Operational data stores are capturing transactions in OLTP systems continuously and streaming them across to OLAP systems. These two data systems need to be kept in sync to reconcile data and keep data fidelity. It is like a replay of the operations but on a different system.

CDC

This is the flow of data from the OLTP system into an OLAP system, typically the first landing zone, which is referred to as the bronze layer in the medallion architecture. Several tools, such as GoldenGate from Oracle or PowerGate from Informatica, support the generation of change sets, or they could be generated by other relational stores that capture this information on a data modification trigger. Moreover, this could be an omni-channel scenario where the same type of data is coming from different data sources and needs to be merged into a common target table. These could be coming in a continuous stream or ad hoc occasional updates. Usually, the amount of incoming data is small when compared to the target tables they are intended for. Also, there may be numerous change datasets. The following diagram provides a reference architecture of change datasets landing at a defined refresh interval, getting ingested into the staging table, and finally being merged into the target table.

Figure 6.4 – CDC flow for change datasets

Figure 6.4 – CDC flow for change datasets

Among other things, the change dataset holds at least four pieces of critical information, including the timestamp of the event, the type of operation such as insert/update/delete, the unique key that can be used to join with a target table to determine whether it is an existing record or not, and the new value. There are two variations to the architecture:

  • The first approach involves a temporary staging table where the change dataset lands as an insert followed by an insert overwrite type operation into the target table. For example, a continuously streaming setup, insert operation from a cloud storage location, or Kafka topic. The inner condition ensures we pick the latest record if multiples come. If it is a delete, then it is suppressed altogether so the staging table closely matches an SCD Type 2 table, and the final table resembles an SCD Type 1 table. We will cover SCD in the Slowly Changing Dimensions (SCD) section of this chapter.
  • The second approach involves an INSERT or INSERT OVERWRITE into the staging table with a partition on the date/time field, followed by a MERGE INTO for the target table.

Additional partition columns on date/time fields help ensure that operations are efficient and work off only the relevant datasets instead of doing full table scans, which, for a large table, is a very expensive operation:

Whenever possible, it is a good idea to use partition columns for partition pruning and Z-order on columns used in the 'where' clause of consumption queries for better performance by the query engine from the target tables. The 'VACUUM' operation on the target table at periodic intervals helps to purge the older versions (as in copies of the data). On the staging table, a simple overwrite from the target table can help control additional data bloat so that all records where the rank is greater than one are removed.

If CDC is attempted on a large number of tables, it is recommended to use a generic pipeline with a configuration-driven approach to scale out to the different datasets, which may vary by parameters such as event name, timestamp, matching key field, partition column, and so on. A simple schema of table name, partition column, rank criteria, and strategy (insert versus update) can help to abstract out the code and configuration aspects and go a long way with the debugging and maintenance of the numerous pipelines. Any improvement to the base code of the pipeline will benefit all the pipelines.

Change Data Feed (CDF)

This is the continuation of the flow of CDC data from the landing zone Delta tables downstream to curated silver and aggregated gold zones. It is worth pointing out that time travel and data versioning are done at the file level and not at the individual row level. CDF addresses issues around quality control and inefficiencies in a big data ecosystem. Every new ingestion round brings in new data and handling these row-level changes across dataset versions is hard to maintain. Efficiency is paramount as we are talking about the context of real-time systems, and it is hard to justify the time taken to scan whole datasets to understand which rows need to be merged.

CDF can be enabled at a cluster or table level. Additionally, it can be enabled on a brand new table (during the CREATE operation or on an existing table as a subsequent UPDATE). Once enabled, this feature is forward-looking, meaning previous ingestion setups are treated as is. The following example shows its use during a new table creation:

The changes can be queried using the table_changes function either by stating a starting version or a starting and ending version as follows:

An inner join of the silver zone table with the table_changes data helps identify the changed rows and aggregation on those rows for gold zone tables can now be done selectively to improve the efficiency of operations:

It is worth noting that CDF operations, though highly optimized, are nevertheless an additional step and there are scenarios where they are a better fit than others. For example, if data coming in is always additive, resulting in an append-only mode, then additional checks of CDF are not really necessary. If most of the data is updated on every operation, then a direct MERGE operation suffices. If the source system has curated data in CDC format, then it is a perfect fit for the CDF data pattern, especially when these changes need to be transferred to downstream tables. The kill-and-fill scenario where entire data dumps are replaced is obviously not a fit.

To summarize, CDF makes handling data changes in Delta tables easy and efficient. The audit trail that it provides by itself is a huge benefit and it can be saved to cloud storage for future analysis. The ultimate use case is materialized views that provide real-time updates to BI dashboards by efficiently processing only the changed rows, instead of reprocessing entire tables every time a change comes through. In addition to Delta tables, streaming services such as Kafka and other data stores can also be a target for these change sets. In the next section, we will discuss a related concept – Slowly Changing Dimensions (SCD) – which is viewed from the target table's perspective.

Handling Slowly Changing Dimensions (SCD)

Operational data makes its way into OLAP systems that comprise fact and dimension tables. The facts change frequently and are usually additive in nature. The dimensions do not change as often but they do experience some change, hence the name "slowly changing dimensions."

Business rules dictate how this change is to be handled and the various types of SCD operations reflect this. The following table lists them.

Figure 6.5 – SCD types

Figure 6.5 – SCD types

Of all these alternatives, types 1 and 2 are the most popular in the industry. In the next section, we will explore them in more detail.

SCD Type 1

This is fairly straightforward as there is no need to store the historical data; the newer data just overwrites the older data. Delta's MERGE constructs come in handy. There is an initial full load of the data. New data is inserted, existing data is updated, and deletes remove the data altogether.

Figure 6.6 – SCD 1 – initial load

Figure 6.6 – SCD 1 – initial load

This is a visualization of the state of the target dimension table on the initial load of data the very first time we perform an ingestion operation. The subsequent operations are more interesting because decisions have to be made regarding which rows will be additions and which rows will require modification.

Figure 6.7 – SCD 1 – handling new data and changes to old data

Figure 6.7 – SCD 1 – handling new data and changes to old data

In the following example, we are picking up new data from the bronze table (src) and merging into the target table (tgt) by joining user identifier, only if there has been a change in the city field. If a matching record in the target table is found, an update operation is done; otherwise, it is regarded as a new record that gets inserted. Joining on user_id is a must, whereas adding the city check is an optimization. If every day you were getting the same data dump, then the target table would be left untouched. A count of the number of rows updated, inserted, or deleted will make this point clearer.

In the next section, we will discuss SCD Type 2, which is a very popular pattern as it retains older history.

SCD Type 2

Much like in SCD Type 1, there is an initial load and any new data that comes later is added as new records. However, unlike SCD Type 1, any change to existing data is manifested as a new row. New columns around effective dates and recency are added to manage the change. Start and end dates to indicate the validity period of data help when going back in time. A current Boolean field helps to indicate whether that data is the most recent or not. All of this should be done as a single operation in order not to introduce race conditions, and this is where Delta's ACID properties come in handy.

The ability to do "as of" analytics is powerful. It allows you to rewind in time to recreate the view of data and understand exactly what the state of the world was then. Let's look at an initial load and subsequent ingestion to a dimension table.

Figure 6.8 – SCD 2 – initial load

Figure 6.8 – SCD 2 – initial load

As in SCD Type1, the very first ingestion is straightforward, all the rows are inserted as is, but do take note of the additional date fields and a Boolean field to indicate whether it is currently active or not. As expected, everything is set to True. On subsequent ingestions, the start and end dates get adjusted, and the older values are set to False to indicate that they are no longer current.

Figure 6.9 – SCD 2 – Initial Load

Figure 6.9 – SCD 2 – Initial Load

We looked at star schema in the previous chapters when visualizing a fact table with several dimension tables around it. SCD Type 2 is a classic data warehouse problem that can be solved elegantly using Delta. The relationships between the fact and dimension tables remain unchanged; what changes, though, is the validity period of data in the dimension table as it relates to the "as of" state of the world and is controlled by the date/time ranges. At any time, there is only a single active or current record of the dimensional data to avoid confusion.

The same example that we used for SCD Type 1 is used again to demonstrate SCD Type 2. There are several ways to do this, and the one demonstrated uses a temporary src_changes, which in turn is a union of two other temporary tables, src_bronze_table and inserts_for_matched_changes. Now, user_id is used to create a new field called merge_key. Because the changed records are duplicated, a second set with a null merge_key is also added. The subsequent merge operation treats the match records as the ones whose end_date is updated to update_date, and current_indicator is set to false or an old record. A brand new record is inserted on a 'when not matched' whose current_indicator is set to true and the end date is left as null. Some variations of SCD Type 2 may use a very large time value for this end date. Both approaches are fine:

SCD Type 2 may seem long-winded, but once adopted and implemented, it is boilerplate code that can be reused for multiple tables by building a simple configuration-driven framework to cater to additional tables.

Summary

Delta Lake with ACID transactions makes it much easier to reliably perform UPDATE and DELETE operations. Delta introduces the MERGE INTO operator to perform Upsert/Merge actions as atomic operations along with time travel features to provide rewind capabilities on Delta Lake tables. Cloning, CDC, and SCD are patterns found in several use cases that build upon these base operations. In this chapter, we have looked at these common data patterns and shown how Delta continues to provide efficient, robust, and elegant solutions to simplify the everyday work scenarios of a data persona, allowing them to focus on the use case at hand.

In the next chapter, we will look at data warehouse use cases and see if all of them can be accommodated in the context of a data lake. We will reflect on whether there is a better architecture strategy to consider instead of just shunting between warehouses and lakes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset