© The Author(s), under exclusive license to APress Media, LLC, part of Springer Nature 2022
A. MortonMastering Snowflake Solutionshttps://doi.org/10.1007/978-1-4842-8029-4_2

2. Data Movement

Adam Morton1  
(1)
Sydney, NSW, Australia
 

In this chapter, you will look at the different ways in which you can make data available within the Snowflake platform. First, you’ll examine the supported file locations and common patterns you see in the field, before moving on to look at the COPY INTO command and some useful parameters. You’ll learn about important considerations when planning to load data, such as how best to prepare files for loading, the use of dedicated warehouses, and ways to partition the data for efficient loading. You’ll also take a detailed look at streams and tasks. Streams allow you to track changes from a source table, providing an easy way to identify changes in the source data. Tasks allow you to control the execution of your pipelines so they run in the right order.

The frequency of incoming data also has a strong influence on the approach you’ll adopt. Ingesting data in near real time requires a different approach to bulk loading data, for example. You’ll meet Snowpipe, a service Snowflake provides to support continuous data loading.

Throughout this chapter, I’ll share my own experiences about using these approaches in the real world. I’ll provide you with tips and highlight key patterns which will hopefully save you a lot of time.

The chapter ends by bringing all the concepts you learned into a practical example with SQL scripts.

Stages

There’s a lot to think about before you can even get started writing your data pipelines. Firstly, you need to consider where best to stage your data. In Snowflake, a stage is an area to rest your data files prior to loading them into a table.

There are two primary types of stages:
  • External stages

  • Internal stages

External Stages

When you create an external stage in Snowflake, you can think of it like a pointer to a third-party cloud storage location. This can be Amazon S3, Google Cloud Storage, or Microsoft Azure regardless of what platform you run your Snowflake account on. So, for example, you can run your Snowflake account on Amazon and create an external stage on Google Cloud Storage or Microsoft Azure.

When you create an external stage in Snowflake, it creates an object within the selected schema. This object holds the URL along with any required settings to access the cloud storage location. Before creating a stage, you must also define a file format, which tells Snowflake what kind of files to expect. You will use an external stage in the practical example at the end of this chapter.

The following code demonstrates how to create an external stage with the credentials to authenticate against an Amazon S3 bucket. Here, you replace the AWS_KEY_ID and AWS_SECRET_KEY with the values for your own environment.
CREATE STAGE "BUILDING_SS"."PUBLIC".S3_STAGE URL = s3://building-solutions-with-snowflake' CREDENTIALS = (AWS_KEY_ID = '**********' AWS_SECRET_KEY = '****************************************');

External Tables and Data Lakes

Within your external stage you can have external tables. These objects hold metadata that tells Snowflake where to locate the data files that relate to the table. This approach allows data to sit outside of Snowflake but appear to users as if it resides within Snowflake.

This can be advantageous if you have a large amount of data in cloud storage but the value of a large proportion of it is yet to be determined. Figure 2-1 illustrates a common pattern I see. A data lake is used to store high volumes of raw data cost effectively, while only a subset of high quality, refined data is loaded into Snowflake. This can be useful when you want your data lake to remain as the source of truth. It allows you to expose a subset of key business data and model it into a format to support the business needs. This approach aims to ensure that the effort required to create to guarantee the delivery of the data to the business is aligned with the data that provides high value.
Figure 2-1

Conceptual data lake pattern

Meanwhile, the majority of the data residing in the data lake can support exploratory analytics. This looks to take a hypothesis from the business and prove it using the data in the data lake.

For example, let’s say a home insurance product is losing money because the staff didn’t accurately price premiums for homes in an area subject to regular flooding. An operating theory is that if they overlay those areas at risk of flooding based on historical information against their existing pricing structures, they should be able mitigate some of their risk by increasing premiums accordingly.

They want to run some analysis to prove (or disprove) their theory. This is where the data lake can help support the required testing. If the theory ends up being correct, at this point the data can be refined and ingested into the data warehouse. This allows the business to work with the new data points to adapt the existing pricing models to refine the premiums charged to certain customers.

Some organizations I have worked for in the past have insisted on having a centralized data lake to store all raw data. The rationale behind this was to provide maximum flexibility should requirements arise that might not be a natural fit for Snowflake. These requirements could be satisfied using another tool along with data in the data lake, for example.

Other clients view a data lake as a layer of protection against “vendor lock-in” so they avoid placing all their data directly into Snowflake. However, you can actually unload data from Snowflake, which I’ll cover later in this chapter.

The key takeaway here is that you don’t need to load data into Snowflake before you can query it. If your data resides in cloud storage, you can work with it in external tables.

Internal Stages

Whereas external stages focus on data outside of Snowflake, internal stages focus on data within Snowflake. There are three types of internal stages: user, table, and named.

User

The user stage is allocated to each user. This is when only one user needs to access the staging data before loading it into multiple target tables. You refer to a user stage with the following prefix: @~.

Table

Each table has a table stage associated with it. Multiple users can access the data files, but these files can only load into one target table. You refer to a table stage with the following prefix: @%.

Named

A named internal stage is very similar to a named external stage but, as the name suggests, all the data files exist within Snowflake. Multiple users can access data in this stage and the data files can also target multiple tables. You refer to a table stage with the following prefix: @.

File Formats

The file format provides information to Snowflake on how to interpret an incoming file. You can apply a file format on the fly, which you’ll see in the practical example at the end of this chapter. However, you’ll probably need to leverage the same file format multiple times in your solution. It’s best to define a file format once and give it a name so it can be easily referenced when you attempt to load the file. This promotes reusability and, if anything changes in the future, you only need to update the file format in one place. The file format syntax is as follows:
CREATE [ OR REPLACE ] FILE FORMAT [ IF NOT EXISTS ] <name>
                      TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ]
                      [ COMMENT = '<string_literal>' ]

This command creates an object in the defined schema; therefore, the name must be unique within the schema. You also must specify one of the six supported file types.

Note

File formats are also used for unloading data from Snowflake. Data in Snowflake can be unloaded into JSON or CSV file formats. I cover unloading data in more detail later in this chapter.

In addition to the basic, required parameters in the syntax above, there are a large number of optional parameters. Table 2-1 shows the ones I have found to be rather useful and are therefore more commonly used over others.
Table 2-1

Commonly-Used Parameters

Parameter

Description

Example Value(s)

Compression

Tells Snowflake how the staged file is compressed, so it can be read. When unloading, data compression can also be applied once the file has been exported from Snowflake.

GZIP

RECORD_DELIMITER

The delimiter in the data file used to denote the end of a complete record.

','

SKIP_HEADER

Allows you to easily skip a header row by specifying the number of header rows to ignore.

1

TRIM_SPACE

A Boolean that specifies whether to remove white space from fields. I have always found this useful when working with CSVs, so I apply it often.

TRUE

ERROR_ON_COLUMN_

COUNT_MISMATCH

Instructs Snowflake whether to fail the load if the column count from the data file doesn’t match the target table.

In certain scenarios, I have found this very valuable. For example, your application providing the source data file may append new columns to the end of the file. In certain circumstances, you may want to continue to load the data seamlessly. This allows you to do just that. However, note that you may want to use the default behavior here, which will fail the load if the column count changes. It all comes down to your specific needs in your own environment. It’s one function well worth being aware of.

TRUE

Encoding

I always try to create CSVs as UTF8 if I can. UTF8 is Snowflake’s default format, so the ability to standardize the data before loading data into Snowflake can help you avoid potential issues.

'UTF8'

For comprehensive detail on all available parameters, refer to the official Snowflake documentation at

https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html.

The COPY INTO Command

Once your data is staged into an external stage or one of the internal stage options, you are ready to load it into Snowflake. To do this, you use the COPY INTO command. This command loads staged data into a table, so you’ll be using it often!

COPY INTO Syntax

First up, let’s look at the standard COPY INTO syntax:
COPY INTO [<namespace>.]<table_name>
     FROM { internalStage | externalStage | externalLocation }
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
[ PATTERN = '<regex_pattern>' ]
[ FILE_FORMAT = ( { FORMAT_NAME = '[<namespace>.]<file_format_name>' |
                    TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML }
Let’s break these parameters down so you can look at what is happening here. Table 2-2 describes them.
Table 2-2

Parameters of the COPY INTO command

Parameter

Description

Example Value(s)

Namespace

The database and/or schema of the table.

If the database and schema are selected as part of the user session, then this is optional.

EDW.sales

table_name

The name of the target table to load the data into.

sales_transactions

From

Specifies the internal or external location where the files containing data to be loaded are staged.

s3://raw/sales_system/2021/05/11/

(AWS S3 example)

Files

(optional)

Specifies a list of one or more files names (separated by commas) to be loaded.

sales_01.csv

Pattern

(optional)

A regular expression pattern string, enclosed in single quotes, specifying the file names and/or paths to match.

*.csv

(loads all csv files within the given location)

File_Format

(optional)

Specifies an existing named file format to use for loading data into the table. See the “File Format” section of this chapter for more detail.

ff_csv

TYPE

(optional)

Specifies the type of files to load into the table.

GZIP

VALIDATION_MODE

(optional)

This is a really handy feature. It allows you to validate the data file without the time it takes to load the data into Snowflake first.

This parameter is useful for testing out staged data files during the early phases of development. It is possible to validate a specified number of rows or the entire file. This makes for shorter iterations when testing your data files.

RETURN_10_ROWS

Transformations

The COPY INTO command supports some basic, lightweight transformations which can simplify your downstream ETL operations. It is possible to load a subset of table data and reorder, alias, cast, or concatenate columns, as the following examples show. Here is an example of selecting a subset of columns using column position:
COPY INTO home_sales(city, zip, sale_date, price)
   FROM (SELECT t.$1, t.$2, t.$6, t.$7 FROM @mystage/sales.csv.gz t)
   file_format = (format_name = mycsvformat);
This is how you apply a substring function, carry out concatenation, and reorder columns:
COPY INTO HOME_SALES(city, zip, sale_date, price, full_name)
   FROM (SELECT substr(t.$2,4), t.$1, t.$5, t.$4, concat(t.$7, t.$8) from @mystage t)
   file_format = (format_name = mycsvformat);

Data Loading Considerations

To load data into Snowflake, not only do you need to know how to create stages, file formats, and use the COPY INTO command, but you also need to consider other factors that make the data loading process more efficient. The following section covers these key factors.

File Preparation

You’ll want to take advantage of Snowflake’s ability to load data in parallel. So, it is important to consider the size of files to be loaded. They shouldn’t be too large or too small. Snowflake recommends you should aim to produce files of around 100-250MB compressed. This may force you to split larger files into smaller ones or group together several small files into a larger one.

I worked on a project where we needed to migrate a lot of historical data from a legacy on-premise warehouse to Snowflake. To do this, we used our Extract Transform Load (ETL) tool to extract data in monthly increments before we wrote the data out into multiple CSV files. We pushed these files into our cloud storage area (in this case we were using the external stage approach described above). We were then able to load these files in parallel into one target table.

Note

The number and capacity of servers in a virtual warehouse influences how many files you can load in parallel, so it pays off to run some tests and find the balance that works for you.

When working with CSV files (or any delimited text files, for that matter), I have found that it is useful to standardize some of this data before loading into Snowflake. Where possible, encode the file in the UTF-8 format, which is the default character set in Snowflake. We also chose to escape any single or double quotes in the data; we also selected unique delimiter characters to be extra sure.

Carriage returns appeared in some of our source data, which were introduced from free text fields in the front-end application it supported. For our specific purposes, we were able to remove them as part of the process.

Semistructured Data

All semistructured data in Snowflake is held in a VARIANT column in the target table. I discussed in Chapter 1 how Snowflake will attempt to treat semistructured data in the same way as relational data behind the scenes by compressing and storing data into a columnar format.

Not every element is extracted into this format. There are some exceptions. One of the most notable is the handling of NULL values. When a “null” value in semistructured data is encountered, Snowflake doesn’t extract that null into a column. The reason for this is so the optimizer can distinguish between a SQL NULL and an element with the value “null.”

The data type you use can also prevent Snowflake from storing the data in an optimized way. Here is an example that contains a number:
{"policynumber":100}
This example contains a string:
{"policynumber":"101"}

Due to the difference in data types used for the same field, Snowflake cannot store these values in an optimized way, resulting in a price to pay. That price is performance. If Snowflake cannot find the extracted element it needs within the columnar format, then it must go back and scan the entire JSON structure to find the values.

To provide Snowflake with the best chance of optimizing the storage of semistructured data, there are a couple of things you can do. If you need to preserve the “null” values, you can extract them into a relational format before you load the data into Snowflake. This creates additional upfront work to initially create the relational structure but the performance gains when querying this data in Snowflake may well be worth it. You’ll have to make this assessment when considering how often this data will be queried.

Alternatively, if the “null” values don’t provide any additional information or context, then you can set the STRIP_NULL_VALUES setting to TRUE in the file format.

Dedicated Virtual Warehouses

When loading data into Snowflake a key concept to understand is how you can use dedicated VWs to prevent contention. You might have had experiences where a user executes an intensive query before going home for the evening. This query continues to run throughout the night and locks tables, thus preventing the ETL process from running. There were measures you could have taken to prevent this from occurring, of course, but I am using this example to illustrate that the design in Snowflake is different and by nature you can easily avoid running into similar issues.

It is common to have dedicated VWs to supply the resources for your data pipelines and bulk data loads. This ensures that the correct amount of resources is available at the right time to allow for efficient loading of data into Snowflake. Configuring multiple dedicated warehouses to line up with business groups, departments, or applications protects these processes from fighting for the same resources. It also allows you to configure each dedicated warehouse in a specific way to match the demand. Figure 2-2 shows an example setup.
Figure 2-2

Dedicated virtual warehouses

Partitioning Staged Data

One thing that is often overlooked at the outset of a project when planning to load data is the partitioning of data in the external cloud storage locations. Essentially, this means separating the data into a logical structure within AWS S3, Google Cloud Storage, or Azure Blob containers using file paths.

Partitioning not only makes it easier to locate files in the future but is important for performance purposes. Creating a structure that includes the source system or application name along with the date the data was written should be considered a minimum requirement. This allows you to pinpoint the data to be loaded onto Snowflake using the COPY command. Figure 2-3 shows an example of physical tables mapped to logical partitions within an S3 bucket.
Figure 2-3

Partitioning data in S3

You will almost certainly want to consider more granular levels of partitioning when it comes to the date. For example, a common approach is to use year/month/day, which can be broken down even further to hour or minute increments. The choice you make will be based on how many files you plan to generate over a certain time period. The goal is to reduce the number of files sitting in each directory as the COPY statement will read a directory list provided by the cloud storage area.

Loading Data

As you load data into Snowflake using the COPY INTO command , metadata is tracked and stored for 64 days. This includes several fields for each data file loaded, such as the name of the file, the file size, and the number of the rows in the file. For a complete list, refer to the official Snowflake documentation at https://docs.snowflake.com/en/user-guide/data-load-considerations-load.html#load-metadata.

Not only is this metadata very useful for understanding what files were loaded when, but it also prevents the same data files being loaded more than once.

In certain environments, you may have limited control over the data files being delivered to you. In these cases, you may not be able to guarantee that the data files aren’t archived after you’ve loaded them. Also, there could be occasions when the same files are mistakenly delivered to the staging area again. In these instances, you don’t need to worry about writing defensive code to check for this. If you’ve loaded the file in the previous 64 days, Snowflake will simply ignore it. Snowflake calculates and stores an MD5 checksum across all records from when the files are loaded. The checksum is stored in the load metadata and subsequently leveraged to prevent the reloading of duplicate files within the 64-day window.

You can override this behavior, of course, and this can be useful for testing. Imagine you want to reload the same file again and again to test an end-to-end process. In this case, you can use the FORCE copy option to override the default behavior.

Loading Using the Web UI

It is also possible to load data via the web UI in Snowflake. Due to several limitations, it’s not something you would use as an enterprise scale process, but rather for simple ad-hoc requests. More typically, non-technical users such as data analysts who are working with small volumes of data may prefer this user-friendly way of loading data into Snowflake so they can carry out analysis on the data. It should go without saying that for regular, operational data loads, this should be incorporated into your ETL process, which will use the COPY INTO command.

Unloading Data from Snowflake

You can also export data that resides in Snowflake using a process known as unloading. I briefly touched on this earlier in this chapter in the “File Format” section. The options here are CSV and JSON, and you can unload them to an internal or external stage.

The following code shows how you can write SQL queries with the COPY INTO command to unload data. This allows you to join tables or filter data out, for example, before you unload the data.
COPY INTO @ext_stage/result/data_
FROM
        (
        SELECT t1.column_a, t1.column_b, t2.column_c
        FROM table_one t1
        Inner join table_two t2 on t1.id = t2.id
        WHERE t2.column_c = '2018-04-29'
        )

Why would you want to do unload data in the first place? Well, this is a very good question! In my opinion, Snowflake had to provide this option to give prospective customers comfort around being tied to a particular vendor. If these customers decided not to have an external stage and moved all their data directly into Snowflake with no way of getting it back out, this could be a major issue in the future. Allowing users to unload data out of Snowflake largely mitigates this risk.

If you’re not going to introduce an external stage or an alternative to accessing all the raw data, then I would carefully consider this decision. With storage being so cheap, you would need to have a compelling reason not to externally stage your raw data given the future flexibility this provides.

Bulk vs. Continuous Loading

Bulk loading means to extract data to load into Snowflake on a periodic basis. As data accumulates over time, let’s say a 24-hour period, this produces a batch of records to be loaded in bulk in one go. To do this, you use the COPY INTO command , as discussed.

What happens when you want to load a steady stream of incoming data, such as from Internet of Things devices, directly into Snowflake? This streaming data, where each record is loaded individually as soon as it arrives, is referred to as continuous loading .

The middle ground between the two is known as a micro batch. This is when data arrives every few minutes and is then loaded into Snowflake.

Micro batches and continuous data loads require a different solution to bulk loading data. Thankfully Snowflake has an answer to this, with a service called Snowpipe, which I’ll get into next.

Continuous Data Loads Using Snowpipe

Snowpipe is a fully managed service designed to quickly and efficiently move smaller amounts of frequently arriving data into a Snowflake table from a stage.

You must create files first, meaning you cannot stream data directly into Snowflake from Kafka or Kinesis. You stream data into an external stage, such as S3. S3 calls Snowpipe using an event notification to tell Snowpipe there is new data to process. The data is then pushed into a queue (continuing with our AWS example, an SQS queue). The Snowpipe service then consumes and processes data from this queue and pushes the data to a target table in Snowflake. Figure 2-4 illustrates the logical architecture for this scenario.
Figure 2-4

Continuous data loading with Snowpipe

An alternative approach to this is to use REST API calls. You can write an application in a Lambda function that generates data and pushes it to a S3 bucket. It then invokes Snowpipe’s API, which places the files in a queue before loading them to the target table. The time from making a call (using either method) to seeing the data arrive in the target table is around 1 minute.

Snowpipe saves you from running multiple, repeatable COPY INTO statements, supports semistructured data, removes the need for tuning or any other additional management, and you don’t need to worry about the resources, as it’s completely serverless.

Serverless means you don’t need to manage the underlying infrastructure or be concerned with scaling up or down. You still need to pay using Snowflake credits, and on your bill, you’ll see Snowpipe listed as a separate warehouse. However, there’s no virtual warehouse for you to configure and manage while using Snowpipe. Snowflake takes care of this for you.

Additionally, you only pay for the compute time used to load data. Snowpipe is the only service with true per-second billing; it is not subject to the minimum 60-second charge associated with your own virtual warehouses.

Streams and Tasks

Setting up and ingesting data from continuous data pipelines creates some new challenges.
  • How can you identify new and changed data?

  • Where is it best to carry out transforms by applying business logic?

  • How can you orchestrate the load and manage dependencies?

  • How can you ensure reliability in the cloud?

This is where streams and tasks come into the picture. They are two independent features of Snowflake but are commonly used in tandem, as you’ll discover next.

Change Tracking Using Streams

Suppose you are loading data into a staging table using Snowpipe. Let’s say you want to track changes on that table in order to merge those incremental changes into a target table.

A stream object tracks any DML operations (inserts, updates, and deletes) against the source table. This process, known as Change Data Capture(CDC), isn’t new but has become far easier to implement over time. One of the primary benefits of using CDC is to help streamline the movement of data. For example, if you have a very large transaction table in your source system containing millions and millions of records, with 1,000 new transactions being added per day, yet no records change historically, you wouldn’t want to reload the entire table each day. Using CDC against this table allows you to identify and target just those 1,000 records for extraction. This makes the entire process faster and more efficient.

In Snowflake, when you create a stream, a few things happen. Firstly, a pair of hidden columns are added to the stream and they begin to store change tracking metadata. At the same time, a snapshot of the source table is logically created. This snapshot acts as a baseline, so that all subsequent changes on the data can be identified. This baseline is referred to as an offset . You can think of it like a bookmark, which stores a position against a sequence of changes. The stream also creates a change table, which stores both the before and after record between two points in time. This change table mirrors the structure of the source table along with the addition of some very handy change tracking columns. You can then point your processes to this change table and process the changed data.

Multiple queries can query the same changed records from a stream without changing the offset. It is important to note that the offset is only moved when stream records are used within a DML transaction. Once the transaction commits, the offset moves forward, so you cannot reread the same record again.

Stream Metadata Columns

As mentioned, when the change table is created, it mirrors the source table structure and adds some metadata columns to the stream. These columns are there to help you understand the nature of the changes applied to the source table, so you can process them correctly. Table 2-3 shows the metadata columns that are added to the stream.
Table 2-3

Stream Metadata Columns

Metadata Column

Description

METADATA$ACTION

This tells you what DML action was performed (INSERT or DELETE).

Note: An update is effectively a DELETE followed by and INSERT.

METADATA$ISUPDATE

A Boolean value that indicates if the records were part of an UPDATE operation. When TRUE you should expect to see a pair of records, one with a DELETE and one with an INSERT.

Note: The stream provides you with a net change between two offset positions. So if a record was inserted and subsequently updated within the offsets, this will be represented just as a new row with the latest values. In this case, this field will be FALSE.

METADATA$ROW_ID

A unique ID for the row. This can be very helpful for row level logging and auditability throughout your system. I recommend capturing this and storing it as part of your solution so you can accurately track the flow of data through your system.

Tasks

In this chapter, you’ve learned that using the COPY INTO command allows for batch loads of data, while Snowpipe makes it easy to ingest data into Snowflake on a near real-time basis into a staging table. Adding streams allows for identifying changed records.

Theoretically then, this makes the data available to be consumed directly from the staging table. However, it is more than likely you’ll have to join the incoming stream of data to data from other sources and carry out transformations to make sense of the data before making it available to your end users. You’ll probably have a sequence of steps you need to apply to a specific order. You’ll also want to automate this and make it easy to schedule the execution of these steps. This is where tasks enter the frame.

Tasks execute a single SQL statement, giving you the ability to chain together a series of tasks and dependencies so you can execute them as required.

A typical pattern is to have a task running every 10 minutes, which checks the stream for the presence of records to be processed using the system function SYSTEM$STREAM_HAS_DATA('<stream_name>'). If the stream function returns FALSE, there are no records to process and the task will exit. If the stream function returns TRUE, this means that there are new records to be consumed. In this case, the task contains SQL logic that applies transformations or uses stored procedures or user-defined functions before merging those changes into a target table. This data is then ready to be used in dashboards, analytical models, or be made available to operation systems.

As you’re running DML statements against the data, you’ll need to call on the resource from a virtual warehouse. When you create a task then, you must specify a warehouse to use. Tasks require you to have only one parent task, known as the root task. The root task must also have a schedule associated with it. This can be a duration such as 5 minutes or a CRON expression. It’s worth bearing in mind that as you will automatically be charged for the first 60 seconds of a warehouse starting, setting a duration as low as 5 minutes will incur a significant number of credits. Figure 2-5 provides a flow of a simple chain of tasks to populate a data warehouse.
Figure 2-5

A simple chain of tasks

Child tasks can then be chained together to execute by using the CREATE TASK...AFTER and specifying the name of the preceding task. The following code shows the syntax:
CREATE [ OR REPLACE ] TASK [ IF NOT EXISTS ] <name>
  WAREHOUSE = <string>
  [ SCHEDULE = '{ <num> MINUTE | USING CRON <expr> <time_zone> }' ]
  [ ALLOW_OVERLAPPING_EXECUTION = TRUE | FALSE ]
  [ <session_parameter> = <value> [ , <session_parameter> = <value> ... ] ]
  [ USER_TASK_TIMEOUT_MS = <num> ]
  [ COPY GRANTS ]
  [ COMMENT = '<string_literal>' ]
  [ AFTER <string> ]
[ WHEN <boolean_expr> ]
AS
  <sql>

Full details on all parameters can be found in the official Snowflake documentation.

Bringing It All Together

So now let’s bring this all together in a practical example. You can choose to use this as a reference or follow along using the scripts. I recommend the latter, as it will help to cement your understanding.

The Example Scenario

Figure 2-6 illustrates what you’ll put together. In this example, your objective is simply to keep a target table in sync with your source system. This example uses an external stage, the COPY INTO command to unload and load data between the external stage and Snowflake, plus streams and tasks.
Figure 2-6

High-level diagram of the practical example

Note

In this example, you’ll be using an external stage using Amazon S3. I’ll cover setting up a S3 bucket and allowing public access, but I won’t get into the specifics around S3 as this is outside the scope of this book. If you need additional help with this step, refer to the AWS technical documentation online, or use an internal stage.

Steps

In the following steps you’ll create an Amazon S3 bucket using the AWS console and configure security to allow Snowflake to access the data within the bucket (using an external stage). You’ll then unload some data from a Snowflake sample database to your new S3 bucket. The unloaded data will then be used to simulate data loads into Snowflake, allowing you to use streams and tasks to orchestrate efficient data movement between Snowflake tables.
  1. 1.

    Create a new Amazon S3 bucket to act as your external stage, which will store your data. Log into the AWS Console, go to the AWS S3 dashboard, and select the Create Bucket option shown in Figure 2-7.

     

Figure 2-7.

  1. 2.

    Provide a name for your bucket and select a region (Figure 2-8).

     

Figure 2-8.

  1. 3.

    AWS handily blocks all public access to your newly created S3 bucket. Uncheck the Block all public access check box and tick the box at the bottom of the page to confirm you acknowledge the implications of what you’re about to do (Figure 2-9). Note that you’d NEVER do this in a commercial environment, but as this is just a test using sample data, you’re going to allow all public access to your bucket. I cover how to best manage security and access control in Chapter 4.

     

Figure 2-9.

  1. 4.

    Create a folder in your S3 bucket to store the data. Give your folder a name and click Create Folder (Figure 2-10).

     

Figure 2-10.

  1. 5.

    Your S3 dashboard view should now look similar to Figure 2-11.

     

Figure 2-11.

  1. 6.

    You now need to generate the access keys that Snowflake will require to access the S3 bucket. In the AWS console, go to the Identity and Access Management (IAM) section (Figure 2-12).

     

Figure 2-12.

  1. 7.

    Select Access Keys from the main window and click Create New Access Key (Figure 2-13).

     

Figure 2-13.

  1. 8.

    Click Show Access Key and make a note of the Key ID and Secret Key. It is important you keep these safe and don’t share them with anyone (Figure 2-14).

     

Figure 2-14.

  1. 9.

    Now move over to Snowflake to create a new database along with the required schemas and tables.

     
--CREATE DATABASE
CREATE OR REPLACE DATABASE BUILDING_SS;
--SWITCH CONTEXT
USE DATABASE BUILDING_SS;
--CREATE SCHEMAS
CREATE SCHEMA STG;
CREATE SCHEMA CDC;
CREATE SCHEMA TGT;
--CREATE SEQUENCE
CREATE OR REPLACE SEQUENCE SEQ_01
START = 1
INCREMENT = 1;
--CREATE STAGING TABLE
CREATE OR REPLACE TABLE STG.CUSTOMER
(C_CUSTKEY NUMBER(38,0),
 C_NAME VARCHAR(25),
 C_PHONE VARCHAR(15));
CREATE OR REPLACE TABLE TGT.CUSTOMER
(C_CUSTSK int default SEQ_01.nextval,
  C_CUSTKEY NUMBER(38,0),
 C_NAME VARCHAR(25),
 C_PHONE VARCHAR(15),
 DATE_UPDATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP());
  1. 10.

    Create an external stage to point to your S3 bucket. You can choose to do this through the UI by selecting the newly created BUILDING_SS database, selecting Stages, and selecting the Create button. In the next screen, select Existing Amazon S3 Location (Figure 2-15). Then complete the required details.

     

Figure 2-15.

  1. 11.

    Alternatively, you can create a stage using SQL, as follows:

     
CREATE STAGE "BUILDING_SS"."PUBLIC".S3_STAGE URL = s3://building-solutions-with-snowflake' CREDENTIALS = (AWS_KEY_ID = '**********' AWS_SECRET_KEY = '****************************************');
  1. 12.

    Grant permissions on the stage and check the existence of it. Note that after creating the STAGE in the previous step, you may need to create a new session by opening a new query window before running these statements.

     
--GRANT PERMISSIONS ON STAGE
GRANT USAGE ON STAGE S3_STAGE TO SYSADMIN;
--SHOW STAGES
SHOW STAGES;
  1. 13.

    You should see results similar to Figure 2-16.

     

Figure 2-16.

  1. 14.

    Next, you’re going to unload some data, 150k records to be precise, from the SNOWFLAKE_SAMPLE_DATA database in Snowflake to S3. The following command will unload all data into four files into your S3 bucket. Notice that you use the HEADER parameter to ensure each file contains a header record.

     
--UNLOAD DATA TO S3 EXTERNAL STAGE
COPY INTO @S3_STAGE/Customer FROM "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF1"."CUSTOMER"
HEADER=TRUE;
  1. 15.

    Refreshing the S3 management console and you should see something similar to Figure 2-17.

     

Figure 2-17.

  1. 16.

    Copy the data from S3 into your staging table. As you will see in the S3 bucket, data is automatically compressed into GZIP by default. Take note of the FROM statement where you specify only the fields you’re interested in by column position. You also apply certain file format parameters to tell Snowflake how to interpret the file.

     
--COPY INTO TABLE
COPY INTO STG.CUSTOMER (C_CUSTKEY, C_NAME, C_PHONE)
FROM (SELECT $1, $2, $5 FROM  @S3_STAGE/)
FILE_FORMAT=(TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1 COMPRESSION = 'GZIP');
--CONFIRM YOU HAVE 150K RECORDS IN THE STAGING TABLE
SELECT COUNT(*)
FROM STG.CUSTOMER;
  1. 17.

    Insert all the data from your staging table to your target table. This seeds the table to ensure all your data is in sync before you look to track changes.

     
--SEED TABLE
INSERT INTO TGT.CUSTOMER (C_CUSTKEY, C_NAME, C_PHONE)
SELECT  C_CUSTKEY,
        C_NAME,
        C_PHONE
FROM STG.CUSTOMER;
  1. 18.

    Create a stream against your staging table to track subsequent changes.

     
--CREATE STREAM
CREATE OR REPLACE STREAM CDC.CUSTOMER
ON TABLE STG.CUSTOMER;
  1. 19.

    Check that the stream has been created. Again, here you may need to open a new workbook to see the results (Figure 2-18).

     
--SHOW STREAMS
SHOW STREAMS;

Figure 2-18.

  1. 20.

    Check the change table and note the three metadata columns (Figure 2-19).

     
--CHECK CHANGE TABLE FOR METADATA COLUMNS
SELECT *
FROM CDC.CUSTOMER;

Figure 2-19.

  1. 21.

    Now, imagine you have a process that writes changes to your staging table. For ease, you’re going to execute a series of DML statements against the staging table to simulate source system changes. First, let’s run an update against a record.

     
UPDATE STG.CUSTOMER
SET C_PHONE = '999'
WHERE C_CUSTKEY = 105002;
  1. 22.

    Now you can query the change table and see that you have two records, a DELETE and an INSERT and the value for METEDATA$ISUPDATE = TRUE (Figure 2-20).

     
SELECT * FROM CDC.CUSTOMER;

Figure 2-20.

  1. 23.

    You can also call the following function to check if there are records in the stream waiting to be processed. This system function returns a Boolean value; if True, there is data waiting to be processed. You’ll use this as part of a conditional statement when deciding to merge changes to your target table.

     
SELECT SYSTEM$STREAM_HAS_DATA('CDC.CUSTOMER')
  1. 24.

    Next, you create a task to merge these changes (captured in the stream) into the target table (Figure 2-21).

     
CREATE OR REPLACE TASK CDC.MERGE_CUSTOMER
  WAREHOUSE = COMPUTE_WH --YOU MUST SPECIFY A WAREHOUSE TO USE
  SCHEDULE = '5 minute'
WHEN
  SYSTEM$STREAM_HAS_DATA('CDC.CUSTOMER')
AS
MERGE INTO TGT.CUSTOMER TGT
USING CDC.CUSTOMER CDC
ON TGT.C_CUSTKEY = CDC.C_CUSTKEY
WHEN MATCHED AND METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = 'TRUE'
THEN UPDATE SET TGT.C_NAME = CDC.C_NAME, TGT.C_PHONE = CDC.C_PHONE
WHEN NOT MATCHED AND METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = 'FALSE' THEN
INSERT (C_CUSTKEY, C_NAME, C_PHONE) VALUES (C_CUSTKEY, C_NAME, C_PHONE)
WHEN MATCHED AND METADATA$ACTION = 'DELETE' AND METADATA$ISUPDATE = 'FALSE' THEN
DELETE;
--BY DEFAULT A TASK IS SET UP IN SUSPEND MODE
SHOW TASKS;
--ENSURE SYSADMIN CAN EXECUTE TASKS
USE ROLE accountadmin;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE SYSADMIN;
--YOU NEED TO RESUME THE TASK TO ENABLE IT
ALTER TASK CDC.MERGE_CUSTOMER RESUME;

Figure 2-21.

  1. 25.

    Use the TASK_HISTORY table function to monitor your task status.

     
SHOW TASKS;
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(TASK_NAME=>'MERGE_CUSTOMER'));
  1. 26.

    Once your task has executed, check the target table to ensure the record has been updated successfully.

     
SELECT *
FROM TGT.CUSTOMER
WHERE C_CUSTKEY = 105002;
  1. 27.

    Now, insert a new record into your staging table.

     
INSERT INTO STG.CUSTOMER (C_CUSTKEY, C_NAME, C_PHONE)
SELECT 99999999, 'JOE BLOGGS', '1234-5678';
  1. 28.

    And again, confirm the change are processed into the target table.

     
SELECT * FROM TGT.CUSTOMER
WHERE C_CUSTKEY = 99999999;
  1. 29.

    Delete a record and check the results in the target table. This time you shouldn’t see this record in the target.

     
DELETE FROM STG.CUSTOMER WHERE C_CUSTKEY = 99999999;
  1. 30.

    That concludes the testing so, if you want, you can drop the database and suspend the warehouse.

     
DROP DATABASE BUILDING_SS;
ALTER WAREHOUSE COMPUTE_WH SUSPEND;

Summary

In this chapter on data movement, you looked at the various way you can get data into Snowflake. This includes the different types of stages available, how to prepare your data files, and how to partition the data effectively prior to loading data into Snowflake.

You learned how to load data from outside Snowflake quickly and efficiently using the COPY INTO command. You also looked at the benefits of using a dedicated virtual warehouse for certain workloads.

The frequency of processing, by batch or near real-time, influences the solution and the tool selection you’ll make. As part of this discussion, I introduced Snowpipe, Snowflake’s fully managed and serverless data pipeline service to support continuous data loading.

You looked at how easy Snowflake makes it to capture changes to records in a source table using streams and how to orchestrate the end-to-end data load process using tasks. Finally, you brought everything you learned together with a practical example.

Now that you know how to get your data into Snowflake, you can really start to work with the native features and services. In the next chapter, you explore data cloning, which, if used in the right way, can bring a significant amount of value and business agility.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset