Chapter 7. Optimizing Performance and Cost

Performance tuning of BigQuery is usually carried out because we want to reduce query execution times or cost, or both. In this chapter, we look at a number of performance optimizations that might work for your use case.

Principles of Performance

Donald Knuth, the legendary computer scientist, made the famous observation that premature optimization is the root of all evil. Yet Knuth’s full quote is more balanced:1

We should forget about small efficiencies, say about 97% of the time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%. A good programmer will not be lulled into complacency by such reasoning, he will be wise to look carefully at the critical code; but only after that code has been identified.

Following Knuth, we would like to caution that performance tuning should be carried out only at the end of the development stage, and only if it is observed that typical queries take too long. It is far better to have flexible table schema and elegant, readable, and maintainable queries than to obfuscate your table layouts and queries in search of a tiny bit of added performance. However, there will be instances for which you do need to improve the performance of your queries, perhaps because they are carried out so often that small improvements are meaningful. Another aspect to consider is that knowledge of performance trade-offs can help you in deciding between alternative designs.

Key Drivers of Performance

In this chapter, we do two things. In the first part, we show you how to measure the performance of queries so that you can identify the critical, optimizable parts of your program. Then we draw on the experience of BigQuery users and our knowledge of BigQuery’s architecture to identify the types of things that tend to fall into Knuth’s critical 3%. This way, we can design table schema and queries with an awareness of where performance bottlenecks are likely to occur, thereby helping us make optimal choices during the design phase.

To optimize the performance of queries in BigQuery, it helps to understand the key drivers of query speed, which is the focus of the second part of this chapter. The time taken for a query to complete depends on how much data is read from storage, how that data is organized, how many stages your query requires, how parallelizable those stages are, how much data is processed at each stage, and how computationally expensive each of the stages is.

In general, a simple query that reads three columns will take 50% more time than a query that reads only two columns, because the three-column query needs to read 50% more data.2 A query that requires a group-by will tend to be slower than a query that doesn’t, because the group-by operation adds an extra stage to the query.

Controlling Cost

The cost of a query depends on your pricing plan. There are two types of BigQuery pricing plans. The first type is on-demand pricing, in which your employer pays ours (Google) based on the amount of data processed by your queries. If you are on a flat-rate plan, your business gets a certain number of slots3 (e.g., 500 slots), and you can run as many queries you want without incurring any additional costs.

In an on-demand (per query) pricing plan, the cost of a query is proportional to the amount of data processed by the query. To reduce cost in an on-demand pricing model, your queries should process less data. In general, reducing the amount of data scanned will also improve query speed. The third part of this chapter—optimizing how data is stored and accessed—should be of help there.

If you are using a flat-rate reservation, the net cost of your query is quite aligned with the time taken for the query to complete.4 You can indirectly reduce costs in a flat-rate model by hogging the slot reservations for less time—that is, by increasing your query speeds, as discussed in the second part of this chapter.

Estimating per-query cost

If you are on an on-demand pricing plan, you can obtain a cost estimate for a query before submitting it to the service. The BigQuery web user interface (UI) validates queries and provides an estimate of the amount of data that will be processed. You can see the number of bytes the query will process before you run the query by clicking the Query Validator. If you are using the bq command-line client, specify --dry_run to take a look at the query plan and the amount of data processed before invoking the query for real. Dry runs are free. Knowing the amount of data that will be processed by the query, you can use the Google Cloud Platform (GCP) Pricing Calculator to estimate the cost of the query in dollars. Tools such as BigQuery Mate and superQuery provide a price estimate directly but might not have access to information about negotiated discounts. As of this writing, the cost is five dollars per terabyte (for US and EU multiregions) after the free tier usage of one free terabyte per month is exceeded. Note that BigQuery needs to read only the columns that are referenced in your queries, and partitioning and clustering can further reduce the amount of data that needs to be scanned (and hence the cost).

Tip

To experiment with BigQuery, you can use the BigQuery sandbox. This is subject to the same limits as the free tier (10 GB of active storage and 1 TB of processed query data per month), but you can access it without a credit card being required.5

When invoking a query, you can specify the --maximum_bytes_billed parameter to put a limit on the amount of data that a query can process. If the bytes scanned in the query exceeds the maximum bytes that can be billed, the query will fail without incurring a charge. You can also manage costs by requesting a custom quota from the GCP Cloud Console for a limit on the amount of query data processed per day. You can set this limit at a per-project or per-user level.

Finding the most expensive queries

When trying to control costs, it can be helpful to create a short list of queries to focus on. You can do this by querying the INFORMATION_SCHEMA associated with a project to find the most expensive queries:

SELECT
   job_id
   , query
   , user_email
   , total_bytes_processed
   , total_slot_ms
FROM `some-project`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE EXTRACT(YEAR FROM creation_time) = 2019
ORDER BY total_bytes_processed DESC
LIMIT 5

The preceding query lists the five most expensive queries in 2019 in some project based on total_bytes_processed. If you are on flat-rate pricing, you might choose to order the queries based on total_slot_ms instead.

Measuring and Troubleshooting

To tune the performance of queries, it is important to ascertain all of the following aspects of a query so that you know what to focus on:

  • How much data is read from storage and how that data is organized

  • How many stages your query requires and how parallelizable those stages are

  • How much data is processed at each stage and how computationally expensive each stage is

As you can see, each of these requires one aspect that can be measured (e.g., how much data is read) and something that needs to be understood (e.g., the performance implications of how that data is organized).

In this section, we look at how we can measure the performance of a query, peruse BigQuery logs, and examine query explanations. Having done this, we can use our understanding of BigQuery architecture (Chapter 6) and performance characteristics (later sections in this chapter) to potentially improve performance. In the later sections of this chapter, we present queries and their performance characteristics without spelling out the measurement steps that might lead you to apply those performance improvements.

Measuring Query Speed Using REST API

Because BigQuery has a REST API, it is possible to use any web service measurement tool to measure how long a query takes to execute. If your organization already uses one of these tools, it is quite straightforward to use them to measure the performance of a query.

Occasionally, you will need to measure performance of a query from a server where these rich clients are not installed. On such bare-bones machines, the simplest way to measure query time is to use the Unix tools time and curl. As explained in Chapter 5, we can read in the query text and request JSON into Bash variables:6

read -d '' QUERY_TEXT << EOF
SELECT 
  start_station_name
  , AVG(duration) as duration
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY start_station_name 
ORDER BY num_trips DESC 
LIMIT 5
EOF
 
read -d '' request << EOF
{
  "useLegacySql": false,
  "useQueryCache": false,
  "query": "${QUERY_TEXT}"
}
EOF
request=$(echo "$request" | tr '
' ' ')

One key point to note from the preceding code snippet is that we need to turn the query cache off so that all the necessary processing is carried out on the server side each time we invoke the query.

Chapter 5 also discusses how to use the gcloud command-line tool to get the access token and project ID that we require to invoke the REST API:

access_token=$(gcloud auth application-default print-access-token)
PROJECT=$(gcloud config get-value project)

Finally, we invoke the query repeatedly and compute the total time taken so that we can compute the average query performance and not be at the mercy of occasional network hiccups:

NUM_TIMES=10
time for i in $(seq 1 $NUM_TIMES); do
echo -en "
 ... $i / $NUM_NUMTIMES ..."
curl --silent 
   -H "Authorization: Bearer $access_token" 
   -H "Content-Type: application/json" 
   -X POST 
   -d "$request" 
   "https://www.googleapis.com/bigquery/v2/projects/$PROJECT/queries" > /dev/null
done

When we did this, we got the following result:

Real    0m16.875s
User    0m0.265s
Sys     0m0.109s

The total time to run the query 10 times was 16.875 seconds, indicating that the query took 1.7 seconds on average. Note that this includes the roundtrip time to the server and time spent fetching results; it is not purely the query processing time.

We can estimate what this roundtrip time is by turning the query cache on:

read -d '' request << EOF
{
  "useLegacySql": false,
  "useQueryCache": true,
 "query": "${QUERY_TEXT}"
}
EOF

When we repeat the time query again, we get the following:

Real    0m6.760s
user    0m0.264s
sys     0m0.114s

Because the query is cached, the new numbers are almost all due to network latency. This indicates that the actual query processing time is (16.875 – 6.760)/10, or about 1 second.

Measuring Query Speed Using BigQuery Workload Tester

Although using a web service measurement tool or Unix low-level tools is possible and desirable on bare-bones systems, we recommend that you use the BigQuery Workload Tester for measuring the speed of BigQuery queries in your development environment. Unlike a vanilla web service measurement tool, the Workload Tester is able to net out the roundtrip network time (over which you have little control) and report the query processing time (which is what you want to optimize) without having to repeat the queries. It can measure the time taken for individual queries and for workloads (queries that need to be executed serially), and it can invoke the queries in parallel if you want concurrency testing.

The Workload Tester requires Gradle, an open source build tool. Thus, to install the Workload Tester, you first need to install Gradle. Cloud Shell provides a quick way to try out the Workload Tester. On it and other Debian-based Linux systems, you can install Gradle using the following command:

sudo apt-get -y install gradle

On macOS, you can use this:

brew install gradle

For other operating systems, see the Gradle installation instructions.

Then clone the GitHub repository containing the Workload Tester, and build it from source:7

git clone https://github.com/GoogleCloudPlatform/pontem.git
cd pontem/BigQueryWorkloadTester
gradle clean :BigQueryWorkloadTester:build

Let’s measure the speed of a query to find the average duration of bicycle trips in London. As with our script in the previous section, we could have simply embedded the query in a Bash variable, but it is helpful to have a record of queries measured, so we will write the query text to a file:8

cat <<EOF| tr '
' ' ' > queries/busystations.sql
SELECT 
  start_station_name
  , AVG(duration) as duration
  , COUNT(duration) as num_trips
FROM `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY start_station_name 
ORDER BY num_trips DESC 
LIMIT 5
EOF

We then create a configuration file for each workload for which the workload consists of a set of queries or query files:

cat <<EOF>./config.yaml
concurrencyLevel: 1
isRatioBasedBenchmark: true
benchmarkRatios: [1.0, 2.0]
outputFileFolder: $OUTDIR
workloads:
- name: "Busy stations"
  projectId: $PROJECT
  queryFiles:
     - queries/busystations.sql
  outputFileName: busystations.json
EOF

In this configuration, we set the base concurrency level to 1, meaning that we send only one query at a time. We do, however, also specify a set of benchmark ratios to measure query times at concurrency levels between 1.0 and 2.0 times the base concurrency level (i.e., 1 and 2 concurrent queries). To try concurrency levels of 1, 2, 5, 10, 15, and 20, use the following:

concurrencyLevel: 10
isRatioBasedBenchmark: true
benchmarkRatios: [0.1, 0.25, 0.5, 1.0, 1.5, 2.0]

Then we launch the measurement tool by using this:

gradle clean :BigQueryWorkloadTester:run

The result is a file that contains both the total elapsed time (including roundtrip) and the actual processing time for each of the queries that were run. In our case, the first query (with concurrency level of 1) had a query processing time of 1,111 milliseconds (1.111 seconds), whereas the second and third queries (which ran simultaneously because of the concurrency level of 2) had processing times of 1.108 seconds and 1.026 seconds. In other words, BigQuery provided nearly the same performance whether it was handling one query or two. 

Troubleshooting Workloads Using Stackdriver

Aside from measuring the speed of individual queries, it can be helpful to gauge the performance of entire workflows using the BigQuery logs. You can do this from the GCP web console, in the Stackdriver Logging section. For example, you can look at warnings (and more severe errors) issued by BigQuery for queries and operations on the dataset from Chapter 5 by selecting the ch05 dataset and Warning from the drop-down menus, as shown in Figure 7-1.

Use Stackdriver to look at log messages emanating from BigQuery.
Figure 7-1. Use Stackdriver to look at log messages emanating from BigQuery

This ability to view all BigQuery error messages from a project in a centralized location can be helpful, especially if the queries emanate from scripts and dashboards that don’t display BigQuery error messages. Indeed, it is possible (provided you have the necessary permissions) to look at the logs and piece together the set of operations that are being carried out by a workload to determine whether unnecessary operations are being performed. For example, look at the subset of the operations on the dataset ch05eu, shown in Figure 7-2, and read it from the bottom to the top.

It is possible to use Stackdriver to piece together the set of operations carried out by a workload.
Figure 7-2. It is possible to use Stackdriver to piece together the set of operations carried out by a workload

In this case, it appears that a dataset named ch05eu was created, and a table named cycle_stations_copy was added to it. Then an attempt was made to delete the ch05eu dataset, but it failed because the dataset was not empty. A new table named bad_bikes was added. After this, the bad_bikes table was deleted, and the cycle_stations_copy table was also deleted. Finally, the dataset itself was deleted.

We can examine the details of each of the jobs—for example, let’s look at the first insert job that created the cycle_stations_copy. The details include the schema of the created table, as illustrated in Figure 7-3.

Examine the details of an insert job to ascertain the schema of the table being created.
Figure 7-3. Examine the details of an insert job to ascertain the schema of the table being created

Given these details and knowledge of the context, it might be the case that the cycle_stations_copy table did not use any of the fields in bad_bikes. Perhaps the entire set of operations around bad_bikes was unnecessary and can be removed from the workflow.

Reading Query Plan Information

In addition to measuring query speed and examining the logs, you can diagnose query performance by looking at information available about the query plan. The query plan information lists the stages into which the query is broken down and provides information about the data processed in each of the execution steps that make up each stage. The query plan information is available in JSON form from the job information and visually in the BigQuery web UI.

In BigQuery, the execution graph of an SQL statement is broken up into query stages, where each stage consists of units of work that are executed in parallel by many workers. The stages communicate via a distributed shuffle architecture (see Chapter 6), and so most stages start by reading the output of previous stages and end by writing to the input of subsequent stages. Keep in mind that it is not necessary for a previous stage to complete before a subsequent stage starts—stages can start with the data at hand. So stages do not execute sequentially.

Note

You should keep in mind that the query plan is dynamic given that the exact data size and computational cost of intermediate stages is not known before the stage in question is executed. If the actual size is very different from the anticipated size, new stages might be introduced so as to repartition the data and improve data distribution among the workers. Because of the dynamic nature of the query plan, when exploring query performance, look at the query plan information after the query is complete.

Obtaining query plan information from the job details

The information listed about each stage of a completed query in the job information includes the timestamps at which the stage was started and finished, the total number of records read and written, and the number of bytes written across all workers in order to process this query. For example, try executing the following query:

SELECT
  start_station_name,
  AVG(duration) AS duration,
  COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY
  start_station_name
ORDER BY
  num_trips DESC
LIMIT
  5

Now you can list the job details by invoking the REST API:9

JOBID=8adbf3fd-e310-44bb-9c6e-88254958ccac   # CHANGE
access_token=$(gcloud auth application-default print-access-token)
PROJECT=$(gcloud config get-value project)
curl --silent 
     -H "Authorization: Bearer $access_token" 
     -X GET 
     "https://www.googleapis.com/bigquery/v2/projects/$PROJECT/jobs/$JOBID"

For example, the data about the first stage includes the following information about the time spent waiting for data, the ratio of I/O to compute in the query, and data read, shuffled, and written out:

"waitRatioAvg": 0.058558558558558557,
"readRatioAvg": 0.070270270270270274,
"computeRatioAvg": 0.86036036036036034
...
"shuffleOutputBytes": "356596",
"shuffleOutputBytesSpilled": "0",
"recordsRead": "24369201",
"recordsWritten": "6138",
"parallelInputs": "7",

Because this is primarily an input stage, we care primarily about read and shuffle performance—because this stage kept the processor busy 86% of the time and no data needed to be spilled to disk, it appears that bottlenecks (if any) in this query are not related to I/O. If this query is slow, we’ll need to look elsewhere, perhaps at reasons for the relatively low number of parallelization (7); in this case, this low count is fine because the input data of 24 million records is rather small and quite capable of being processed in seven chunks.

Visualizing the query plan information

Much of the information about the query plan in the job details is represented visually in the BigQuery web UI. Open the query and click the “Execution details” tab to see a depiction of the query plan. This includes the overall timing as well as a breakdown of this timing at the level of the steps that make up each stage of the query.

Tip

As Figure 7-4 shows, the “Execution details” tab provides key information about query performance.

Key measures from the query execution details tab. Performance optimization will typically focus on reducing the slot time and/or bytes shuffled.
Figure 7-4. Key measures from the query execution details tab; performance optimization will typically focus on reducing the slot time and/or bytes shuffled

The overall timing depicted in Figure 7-5 indicates seven parallel inputs being active in the first 0.6 seconds, and then two more units being activated.

Timing data from the query plan information.
Figure 7-5. Timing data from the query plan information

Further details of the stages and steps are provided visually as well, as illustrated in Figure 7-6.

Stages and steps from the query plan information.
Figure 7-6. Stages and steps from the query plan information

From Figure 7-6, we see that the query has three stages. The first stage (S00) is the input stage, the second (S01) consists of sorting, and the third (S02) is the output stage. The timing of each stage is shown visually as well in Figure 7-6. The ratios that were available numerically in the JSON response to the job details request are depicted in the color bars shown in Figure 7-7.10

Stage timing from the query plan information.
Figure 7-7. Stage timing from the query plan information

The dark colors indicate the average time spent waiting, reading, computing, or writing. Here, most of the time is spent performing computation, and a little bit of time is spent waiting. The wait stage is somewhat variable (the intermediate color indicates the maximum wait, and the difference between the average and the maximum is an indicator of variability)—here, the maximum wait time is nearly double the average wait time. Reading, on the other hand, is quite consistent, whereas the writing overhead is negligible.

Zooming in on the input stage, shown in Figure 7-8, we see that it consists of three steps: reading two columns (duration and start_station_name, referred to as $1 and $2) from the BigQuery table, aggregating, and writing the output.

The steps that form the first stage.
Figure 7-8. The steps that form the first stage

The aggregation step consists of three operations: grouping by start_station_name (recall that $2 refers to this input column), finding the average duration ($1) within each shard, and maintaining the count of non-null durations. The writing stage writes the groups ($30), average duration ($20), and count of duration ($21) to an intermediate output, distributing the output by the hash of the station name. In case more than one worker is needed in the next stage, the hash of the station name controls which workers process which parts of the stage_00 output.

Looking back again at Figure 7-6, you can see that the first stage is carried out in parallel over seven workers, while the remaining two stages are carried out on a single worker. The input stage reads in 24.4 million rows and writes out around 6,140 rows, which totals 348 KB. These rows are sorted by the second stage, and five rows are written to the third stage. When we have a stage with only one worker, we should ensure that the memory load on that worker is well within the bounds of what a single worker can handle (we cover ways to do so later in this chapter)—348 KB definitely qualifies, and so this query should not pose any performance issues due to limits on the resources available to a single worker.

Another option to visualize the BigQuery query plan is to use the BigQuery Visualizer at https://bqvisualiser.appspot.com/, as shown in Figure 7-9.

Visualizing a BigQuery job using the BigQuery Visualizer.
Figure 7-9. Visualizing a BigQuery job using the BigQuery Visualizer

The visualizer becomes especially useful for complex queries with tens of stages, which can be difficult to comprehend from just the synopsis available in the BigQuery web UI.

Increasing Query Speed

As discussed in the previous section, you should carry out the following steps to measure query speed and identify potential problems:

  1. Measure the overall workload time using the BigQuery Workload Tester.

  2. Examine the logs to ensure that the workload is not performing any unexpected operations.

  3. Examine the query plan information of the queries that form the workload to identify bottlenecks or unnecessary stages.

Once you have identified that a problem exists and have determined that there are no obvious errors in the workflow, it is time to consider how to improve speed on the critical parts of the workload. In this section, we provide a few possible ways to improve query speed, including the following:

  • Minimizing I/O

  • Caching the results of previous queries

  • Performing efficient joins

  • Avoiding overwhelming a worker

  • Using approximate aggregation functions

Minimizing I/O

As we noted earlier, a query that computes the sum of three columns will be slower than a query that computes the sum of two columns, but most of the performance difference will be due to reading more data, not the extra addition. Therefore, a query that computes the mean of a column will be nearly as fast as a query whose aggregation method is to compute the variance of the data (even though computing variance requires BigQuery to keep track of both the sum and the sum of the squares), because most of the overhead of simple queries is caused by I/O, not by computation.

Be purposeful in SELECT

Because BigQuery uses columnar file formats, the fewer the columns that are read in a SELECT, the less the amount of data that needs to be read. In particular, doing a SELECT * reads every column of every row in the table, making it quite slow and expensive. The exception is when you use a SELECT * in a subquery and then reference only a few fields in an outer query; the BigQuery optimizer will be smart enough to read only the columns that are absolutely required.

Explicitly list the columns that you want to see in the final result. For example, it is much more efficient to find the bike_id responsible for the longest duration trip in the dataset by doing the following:

SELECT
  bike_id
  , duration
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
ORDER BY duration DESC
LIMIT 1

A less efficient method is this:

SELECT
  *
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
ORDER BY duration DESC
LIMIT 1

The first query took us 1.8 seconds and cost 372 MB, whereas the second one took us 5.5 seconds (three times slower) and cost 2.59 GB (seven times costlier).

Tip

Unless you are reading from a clustered table, applying a LIMIT clause does not affect the amount of data you are billed for reading. When reading clustered tables, reductions in the number of bytes scanned will be passed along to you as savings (although they will be less predictable). To preview a table, use the preview button on the web UI instead of doing a SELECT * with a LIMIT. The preview does not incur charges, whereas a SELECT * incurs the same charge as a table scan.

If you require nearly all of the columns in a table, consider using SELECT * EXCEPT so as to avoid reading the ones you don’t require (see Chapter 2).

Reducing data being read

When tuning a query, it is important to start with the data that is being read and consider whether it is possible to reduce it. Suppose that you want to find the typical duration of the most common one-way rentals; you could do the following:

SELECT
  MIN(start_station_name) AS start_station_name
  , MIN(end_station_name) AS end_station_name
  , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration
  , COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
WHERE
  start_station_id != end_station_id 
GROUP BY
  start_station_id, end_station_id
ORDER BY num_trips DESC
LIMIT 10

This takes 14.7 seconds when we run it, and it yields the following:

Row start_station_name end_station_name typical_duration num_trips
1 Black Lion Gate, Kensington Gardens Hyde Park Corner, Hyde Park 1,500 12,000
2 Black Lion Gate, Kensington Gardens Palace Gate, Kensington Gardens 780 11,833
3 Hyde Park Corner, Hyde Park Albert Gate, Hyde Park 1,920 11,745
4 Hyde Park Corner, Hyde Park Triangle Car Park, Hyde Park 1,380 10,923
5 Hyde Park Corner, Hyde Park Black Lion Gate, Kensington Gardens 1,680 10,652

The details of the query indicate that the sorting (for the approximate quantiles for every station pair) requires a repartition of the outputs of the input stage, but most of the time is spent during computation, as demonstrated in Figure 7-10.

This query requires two repartition stages, but most of the time is spent in computation.
Figure 7-10. This query requires two repartition stages, but most of the time is spent in computation

Nevertheless, we can reduce the I/O overhead of the query if we do the filtering and grouping using the station name rather than the station ID, because we will need to read fewer columns:

SELECT
  start_station_name
  , end_station_name
  , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration
  , COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
WHERE
  start_station_name != end_station_name 
GROUP BY
  start_station_name, end_station_name
ORDER BY num_trips DESC
LIMIT 10

This query avoids the need to read the two ID columns and finishes in 9.6 seconds, a 30% increase in speed. This increase is caused by the downstream effects of reading less data: the query requires one less repartition, and fewer workers (10, versus 19 earlier) for the sort, as shown in Figure 7-11.

By taking advantage of the 1:1 relationship between station_id and station_name, we are able to read fewer columns, remove one stage, and use fewer workers for the sort
Figure 7-11. By taking advantage of the 1:1 relationship between station_id and station_name, we are able to read fewer columns, remove one stage, and use fewer workers for the sort

The query result remains the same because there is a 1:1 relationship between the station name and the station ID.

Reducing the number of expensive computations

Suppose that you want to find the total distance traveled by each bicycle in the dataset. A naive way to do this would be to find the distance traveled in each trip undertaken by each bicycle and sum up the distances:

WITH trip_distance AS (
  SELECT
     bike_id
     , ST_Distance(ST_GeogPoint(s.longitude, s.latitude),
                   ST_GeogPoint(e.longitude, e.latitude)) AS distance
  FROM
     `bigquery-public-data`.london_bicycles.cycle_hire,
     `bigquery-public-data`.london_bicycles.cycle_stations s,
     `bigquery-public-data`.london_bicycles.cycle_stations e
  WHERE 
     start_station_id = s.id
     AND end_station_id = e.id
)
 
SELECT 
  bike_id
  , SUM(distance)/1000 AS total_distance
FROM trip_distance
GROUP BY bike_id
ORDER BY total_distance DESC
LIMIT 5

This query takes 7.1 seconds (44 seconds of slot time) and shuffles 1.69 MB. The result is that some bicycles have been ridden nearly 6,000 kilometers:

Row bike_id total_distance
1 12925 5990.988493972133
2 12757 5919.736998793672
3 12496 5883.1268196056335
4 12841 5870.757769474104
5 13071 5853.763514457338

Computing the distance is a pretty expensive operation, and we can avoid joining the cycle_stations table against the cycle_hire table if we precompute the distances between all pairs of stations:

WITH stations AS (
  SELECT
     s.id AS start_id
     , e.id AS end_id
     , ST_Distance(ST_GeogPoint(s.longitude, s.latitude),
                   ST_GeogPoint(e.longitude, e.latitude)) AS distance
  FROM
     `bigquery-public-data`.london_bicycles.cycle_stations s,
     `bigquery-public-data`.london_bicycles.cycle_stations e
),

The rest of the query is quite similar, except that the join is against the table of precomputed distances:

trip_distance AS (
  SELECT
     bike_id
     , distance
  FROM
     `bigquery-public-data`.london_bicycles.cycle_hire,
     stations
  WHERE 
     start_station_id = start_id
     AND end_station_id = end_id
)
 
SELECT 
  bike_id
  , SUM(distance)/1000 AS total_distance
FROM trip_distance
GROUP BY bike_id
ORDER BY total_distance DESC
LIMIT 5

Now the query takes only 31.5 seconds of slot time (a 30% increase in speed) in spite of having to shuffle more data (33.44 MB) between nodes.

Caching the Results of Previous Queries

The BigQuery service automatically caches query results in a temporary table. If the identical query is submitted within approximately 24 hours, the results are served from this temporary table without any recomputation. Cached results are extremely fast and do not incur charges.

There are, however, a few caveats to be aware of. Query caching is based on exact string comparison. So even whitespaces can cause a cache miss. Queries are never cached if they exhibit nondeterministic behavior (for example, they use CURRENT_TIMESTAMP or RAND), if  the table or view being queried has changed (even if the columns/rows of interest to the query are unchanged), if the table is associated with a streaming buffer (even if there are no new rows), if the query uses Data Manipulation Language (DML) statements, or if it queries external data sources.

Note

We recommend against reading directly from the cached temporary tables, because cached tables can expire—if the results of a query can serve as inputs to other queries, we recommend the use of tables or materialized views, as discussed in the next section.

Caching intermediate results

It is possible to improve overall performance at the expense of increased I/O by taking advantage of temporary tables and materialized views. For example, suppose that you have a number of queries that start out by finding the typical duration of trips between a pair of stations:

WITH typical_trip AS (
SELECT
  start_station_name
  , end_station_name
  , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration
  , COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY
  start_station_name, end_station_name
)

The WITH clause (also called a common table expression) improves readability but does not improve query speed or cost because results are not cached. The same applies to views and subqueries as well. If you find yourself using a WITH clause, a view, or a subquery often, one way to potentially improve performance is to store the result in a table (or materialized view):11

CREATE OR REPLACE TABLE ch07eu.typical_trip AS
SELECT
  start_station_name
  , end_station_name
  , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration
  , COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY
  start_station_name, end_station_name

Let’s use the WITH clause to find days when bicycle trips are much longer than usual:

SELECT 
   EXTRACT (DATE FROM start_date) AS trip_date
   , APPROX_QUANTILES(duration / typical_duration, 10)[OFFSET(5)] AS ratio
   , COUNT(*) AS num_trips_on_day
FROM 
  `bigquery-public-data`.london_bicycles.cycle_hire AS hire
JOIN typical_trip AS trip
ON 
   hire.start_station_name = trip.start_station_name 
   AND hire.end_station_name = trip.end_station_name
   AND num_trips > 10
GROUP BY trip_date
HAVING num_trips_on_day > 10
ORDER BY ratio DESC
LIMIT 10

This takes 19.1 seconds and processes 1.68 GB. Now, let’s use the table:

SELECT 
   EXTRACT (DATE FROM start_date) AS trip_date
   , APPROX_QUANTILES(duration / typical_duration, 10)[OFFSET(5)] AS ratio
   , COUNT(*) AS num_trips_on_day
FROM 
  `bigquery-public-data`.london_bicycles.cycle_hire AS hire
JOIN ch07eu.typical_trip AS trip
ON 
   hire.start_station_name = trip.start_station_name 
   AND hire.end_station_name = trip.end_station_name
   AND num_trips > 10
GROUP BY trip_date
HAVING num_trips_on_day > 10
ORDER BY ratio DESC
LIMIT 10

This takes 10.3 seconds (a 50% increase in speed because the computation is avoided) and processes 1.72 GB (a slight increase in cost because the new table is now being read). Both queries return the same result, that trips on Christmas take longer than usual:

Row trip_date ratio num_trips_on_day
1 2016-12-25 1.6 34477
2 2015-12-25 1.5263157894736843 20871
3 2015-08-01 1.25 41200
4 2016-07-30 1.2272727272727273 43524
5 2015-08-02 1.2222222222222223 41243

The table ch07eu.typical_trip is not refreshed when new data is added to the cycle_hire table. One way to solve this problem of stale data is to use a materialized view or to schedule queries to update the table periodically. You should measure the cost of such updates to see whether the improvement in query performance makes up for the extra cost of keeping the intermediate table up to date.

Accelerating queries with BI Engine

If there are tables that you access frequently in Business Intelligence (BI) settings, such as dashboards with aggregations and filters, one way to speed up your queries is to employ BI Engine. It will automatically store relevant pieces of data in memory (either actual columns from the table or derived results) and will use a specialized query processor tuned for working with mostly in-memory data. You can use the BigQuery Admin Console to reserve the amount of memory (up to a current maximum of 10 GB) that BigQuery should use for its cache, as depicted in Figure 7-12.

Reserve memory for caching table data by setting up a BI Engine reservation.
Figure 7-12. Reserve memory for caching table data by setting up a BI Engine reservation

Make sure to reserve this memory in the same region as the dataset you are querying. Then BigQuery will start to cache tables, parts of tables, and aggregations in memory and serve results faster.

A primary use case for BI Engine is for tables that are accessed from dashboard tools such as Google Data Studio. By providing memory allocation for a BI Engine reservation, you can make dashboards that rely on a BigQuery backend much more responsive.

Performing Efficient Joins

Joining two tables requires data coordination and is subject to limitations imposed by the communication bandwidth between slots. If it is possible to avoid a join, or to reduce the amount of data being joined, do so.

Denormalization

One way to improve the read performance and avoid joins is to give up on storing data efficiently and instead add redundant copies of data. This is called denormalization. Thus, instead of storing the bicycle station latitudes and longitudes separately from the cycle hire information, we could create a denormalized table:

CREATE OR REPLACE TABLE ch07eu.london_bicycles_denorm AS
SELECT
  start_station_id
  , s.latitude AS start_latitude
  , s.longitude AS start_longitude
  , end_station_id
  , e.latitude AS end_latitude
  , e.longitude AS end_longitude 
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire as h
JOIN
  `bigquery-public-data`.london_bicycles.cycle_stations as s
ON
  h.start_station_id = s.id
JOIN
  `bigquery-public-data`.london_bicycles.cycle_stations as e
ON
  h.end_station_id = e.id

Then all subsequent queries will not need to carry out the join because the table will contain the necessary location information for all trips:

Row start_station_id start_latitude start_longitude end_station_id end_latitude end_longitude
1 439 51.5338 -0.118677 680 51.47768469 -0.170329317
2 597 51.473471 -0.20782 622 51.50748124 -0.205535908
3 187 51.49247977 -0.178433004 187 51.49247977 -0.178433004
4 15 51.51772703 -0.127854211 358 51.516226 -0.124826
5 638 51.46663393 -0.169821175 151 51.51213691 -0.201554966

In this case, you are trading off storage and reading more data for the computational expense of a join. It is quite possible that the cost of reading more data from disk will outweigh the cost of the join—you should measure whether denormalization brings performance benefits.

Avoiding self-joins of large tables

Self-joins happen when a table is joined with itself. Although BigQuery supports self-joins, they can lead to performance degradation if the table being joined with itself is very large. In many cases, you can avoid the self-join by taking advantage of SQL features such as aggregation and window functions.

Let’s look at an example. One of the BigQuery public datasets is the dataset of baby names published by the US Social Security Administration. It is possible to query the dataset to find the most common male names for the year 2015 in the state of Massachusetts:

SELECT 
  name
  , number AS num_babies
FROM `bigquery-public-data`.usa_names.usa_1910_current
WHERE gender = 'M' AND year = 2015 AND state = 'MA'
ORDER BY num_babies DESC
LIMIT 5

Here’s the result of that query:

Row name num_babies
1 Benjamin 456
2 William 445
3 Noah 403
4 Mason 365
5 James 354

Similarly, the most common female names (gender = 'F') for the year 2015 in Massachusetts were as follows:

Row name num_babies
1 Olivia 430
2 Emma 402
3 Sophia 373
4 Isabella 350
5 Charlotte 344

What are the most common names assigned to both male and female babies in the country over all the years in the dataset? A naive way to solve this problem involves reading the input table twice and doing a self-join:

WITH male_babies AS (
SELECT 
  name
  , number AS num_babies
FROM `bigquery-public-data`.usa_names.usa_1910_current
WHERE gender = 'M'
),
female_babies AS (
SELECT 
  name
  , number AS num_babies
FROM `bigquery-public-data`.usa_names.usa_1910_current
WHERE gender = 'F'
),
both_genders AS (
SELECT 
  name
  , SUM(m.num_babies) + SUM(f.num_babies) AS num_babies
  , SUM(m.num_babies) / (SUM(m.num_babies) + SUM(f.num_babies)) AS frac_male
FROM male_babies AS m
JOIN female_babies AS f
USING (name)
GROUP BY name
)
 
SELECT * FROM both_genders
WHERE frac_male BETWEEN 0.3 and 0.7
ORDER BY num_babies DESC
LIMIT 5

This took 74 seconds and yielded the following:

Row name num_babies frac_male
1 Jordan 982149616 0.6705115608373867
2 Willie 940460442 0.5722103705452823
3 Lee 820214744 0.689061146650151
4 Jessie 759150003 0.5139710590240227
5 Marion 592706454 0.32969114589732473

To add insult to injury, the answer is also wrong: as much as we like the name Jordan, the entire US population is only around 330 million, so there cannot have been 982 million babies with that name. The self-join unfortunately joins across state and year boundaries.12

A faster, more elegant (and correct!) solution is to recast the query to read the input only once and avoid the self-join completely. This took only 2.4 seconds—a 30-times increase in speed:

WITH all_babies AS (
SELECT 
  name 
  , SUM(IF(gender = 'M', number, 0)) AS male_babies
  , SUM(IF(gender = 'F', number, 0)) AS female_babies
FROM `bigquery-public-data.usa_names.usa_1910_current`
GROUP BY name
),
 
both_genders AS (
SELECT
  name
  , (male_babies + female_babies) AS num_babies
  , SAFE_DIVIDE(male_babies, male_babies + female_babies) AS frac_male
FROM all_babies
WHERE male_babies > 0 AND female_babies > 0
)
 
SELECT * FROM both_genders
WHERE frac_male BETWEEN 0.3 and 0.7
ORDER BY num_babies desc
limit 5

Here’s the result, in case you’re curious:

Row name num_babies frac_male
1 Jessie 229263 0.4327213723976394
2 Riley 187762 0.46760792918694943
3 Casey 181176 0.5916456925862145
4 Jackie 161428 0.4624042916966078
5 Johnnie 136208 0.6842549629977681

Reducing the data being joined

It is possible to carry out the previous query with an efficient join as long as we reduce the amount of data being joined by grouping the data by name and gender early on:

with all_names AS (
  SELECT name, gender, SUM(number) AS num_babies
  FROM `bigquery-public-data`.usa_names.usa_1910_current
  GROUP BY name, gender
),
 
male_names AS (
   SELECT name, num_babies
   FROM all_names
   WHERE gender = 'M'
),
 
female_names AS (
   SELECT name, num_babies
   FROM all_names
   WHERE gender = 'F'
),
 
ratio AS (
  SELECT 
    name
    , (f.num_babies + m.num_babies) AS num_babies
    , m.num_babies / (f.num_babies + m.num_babies) AS frac_male
  FROM male_names AS m
  JOIN female_names AS f
  USING (name)
)
 
SELECT * from ratio
WHERE frac_male BETWEEN 0.3 and 0.7
ORDER BY num_babies DESC
LIMIT 5

The early grouping serves to trim the data early in the query, before the query performs a JOIN. That way, shuffling and other complex operations execute only on the much smaller data and remain quite efficient. This query finished in two seconds and returned the correct result.

Using a window function instead of self-join

Suppose that you want to find the duration between a bike being dropped off and it being rented again; in other words, the duration that a bicycle stays at the station. This is an example of a dependent relationship between rows. It might appear that the only way to solve this is to join the table with itself, matching the end_date of one trip against the start_date of the next.

You can, however, avoid a self-join by using a window function (we cover window functions in Chapter 8):

SELECT
  bike_id
  , start_date
  , end_date
  , TIMESTAMP_DIFF(
       start_date, 
       LAG(end_date) OVER (PARTITION BY bike_id ORDER BY start_date),
       SECOND) AS time_at_station
FROM `bigquery-public-data`.london_bicycles.cycle_hire
LIMIT 5

Here’s the result of that query:

Row bike_id start_date end_date time_at_station
1 2 2015-01-05 15:59:00 UTC 2015-01-05 16:17:00 UTC null
2 2 2015-01-07 01:31:00 UTC 2015-01-07 01:50:00 UTC 119640
3 2 2015-01-21 07:56:00 UTC 2015-01-21 08:12:00 UTC 1231560
4 2 2015-01-21 16:15:00 UTC 2015-01-21 16:31:00 UTC 28980
5 2 2015-01-21 16:57:00 UTC 2015-01-21 17:23:00 UTC 1560

Notice that the first row has a null for time_at_station because we don’t have a timestamp for the previous dropoff. After that, the time_at_station tracks the difference between the previous dropoff and the current pickup.

Using this, we can compute the average time that a bicycle is unused at each station and rank stations by that measure:

WITH unused AS (
SELECT
  bike_id
  , start_station_name
  , start_date
  , end_date
  , TIMESTAMP_DIFF(start_date, LAG(end_date) OVER (PARTITION BY bike_id ORDER BY
start_date), SECOND) AS time_at_station
FROM `bigquery-public-data`.london_bicycles.cycle_hire
)
 
SELECT
  start_station_name
  , AVG(time_at_station) AS unused_seconds
FROM unused
GROUP BY start_station_name
ORDER BY unused_seconds ASC
LIMIT 5

From this query, we learn that bicycles turn over the fastest at the following stations:

Row start_station_name unused_seconds
1 LSP1 1500.0
2 Wormwood Street, Liverpool Street 4605.427372968633
3 Hyde Park Corner, Hyde Park 5369.884926322234
4 Speakers’ Corner 1, Hyde Park 6203.571977906734
5 Albert Gate, Hyde Park 6258.720194303267

Joining with precomputed values

Sometimes it can be helpful to precompute functions on smaller tables and then join with the precomputed values rather than repeat an expensive calculation each time.

For example, suppose that you want to find the pair of stations between which our customers ride bicycles at the fastest pace. To compute the pace13 (minutes per kilometer) at which they ride, we need to divide the duration of the ride by the distance between stations.

We could create a denormalized table with distances between stations and then compute the average pace:

with denormalized_table AS (
  SELECT
    start_station_name
    , end_station_name
    , ST_DISTANCE(ST_GeogPoint(s1.longitude, s1.latitude),
                  ST_GeogPoint(s2.longitude, s2.latitude)) AS distance
    , duration
 FROM
    `bigquery-public-data`.london_bicycles.cycle_hire AS h
 JOIN
     `bigquery-public-data`.london_bicycles.cycle_stations AS s1
 ON h.start_station_id = s1.id
  JOIN
     `bigquery-public-data`.london_bicycles.cycle_stations AS s2
 ON h.end_station_id = s2.id
),
 
durations AS (
  SELECT
    start_station_name
    , end_station_name
    , MIN(distance) AS distance
    , AVG(duration) AS duration
    , COUNT(*) AS num_rides
  FROM
     denormalized_table
  WHERE
     duration > 0 AND distance > 0
  GROUP BY start_station_name, end_station_name
  HAVING num_rides > 100
)
 
SELECT
    start_station_name
    , end_station_name
    , distance
    , duration
    , duration/distance AS pace
FROM durations
ORDER BY pace ASC
LIMIT 5

This query invokes the geospatial function ST_DISTANCE once for each row in the cycle_hire table (24 million times), takes 16.1 seconds, and processes 1.86 GB.

Alternatively, we can use the cycle_stations table to precompute the distance between every pair of stations (this is a self-join) and then join it with the reduced-size table of average duration between stations:

with distances AS (
  SELECT 
    a.id AS start_station_id
    , a.name AS start_station_name
    , b.id AS end_station_id
    , b.name AS end_station_name
    , ST_DISTANCE(ST_GeogPoint(a.longitude, a.latitude),
                ST_GeogPoint(b.longitude, b.latitude)) AS distance
 FROM
    `bigquery-public-data`.london_bicycles.cycle_stations a
 CROSS JOIN
     `bigquery-public-data`.london_bicycles.cycle_stations b
 WHERE a.id != b.id
),
 
durations AS (
  SELECT
    start_station_id
    , end_station_id
    , AVG(duration) AS duration
    , COUNT(*) AS num_rides
  FROM
    `bigquery-public-data`.london_bicycles.cycle_hire
  WHERE
    duration > 0
  GROUP BY start_station_id, end_station_id
  HAVING num_rides > 100
)
 
SELECT
    start_station_name
    , end_station_name
    , distance
    , duration
    , duration/distance AS pace
FROM distances
JOIN durations
USING (start_station_id, end_station_id)
ORDER BY pace ASC
LIMIT 5

The recast query with the more efficient joins takes only 5.4 seconds, an increase in speed of three times, and processes 554 MB, a reduction in cost of nearly four times.

JOIN versus denormalization

What if we were to store the distance traveled in each trip in a denormalized table?

CREATE OR REPLACE TABLE ch07eu.cycle_hire AS
SELECT
  start_station_name
  , end_station_name
  , ST_DISTANCE(ST_GeogPoint(s1.longitude, s1.latitude),
               ST_GeogPoint(s2.longitude, s2.latitude)) AS distance
  , duration
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire AS h
JOIN
  `bigquery-public-data`.london_bicycles.cycle_stations AS s1
ON h.start_station_id = s1.id
JOIN
  `bigquery-public-data`.london_bicycles.cycle_stations AS s2
ON h.end_station_id = s2.id

Querying this table returns results in 8.7 seconds and processes 1.6 GB—in other words, it’s 60% slower and about three times more expensivethan the previous query. In this instance, therefore, joining with a smaller table turns out to be more efficient than querying a larger, denormalized table. However, this is the sort of thing that you need to measure for your particular use case. You will see later how you can efficiently store data at differing levels of granularity in a single denormalized table with nested and repeated fields.

Avoiding Overwhelming a Worker

Some operations (e.g., ordering) need to be carried out on a single worker. Having to sort too much data can overwhelm a worker’s memory and result in a “resources exceeded” error. Avoid overwhelming the worker with too much data. As the hardware in Google datacenters is upgraded, what “too much” means in this context expands over time. Currently, this is on the order of one gigabyte.

Limiting large sorts

Suppose that you want to go through the bike rentals and number them 1, 2, 3, and so on, in the order that the rentals ended. We could do that by using the ROW_NUMBER() function (we cover window functions in Chapter 8):

SELECT 
  rental_id
  , ROW_NUMBER() OVER(ORDER BY end_date) AS rental_number
FROM `bigquery-public-data`.london_bicycles.cycle_hire
ORDER BY rental_number ASC
LIMIT 5

Here’s the result:

Row rental_id rental_number
1 40346512 1
2 40346508 2
3 40346519 3
4 40346510 4
5 40346520 5

However, this query takes 29.9 seconds to process just 372 MB because it needs to sort the entirety of the london_bicycles dataset on a single worker. Had we processed a larger dataset, it would have overwhelmed that worker.

In such cases, we might want to consider whether it is possible to limit the large sorts and distribute them. Indeed, it is possible to extract the date from the rentals and then sort trips within each day:

WITH rentals_on_day AS (
SELECT 
  rental_id
  , end_date
  , EXTRACT(DATE FROM end_date) AS rental_date
FROM `bigquery-public-data.london_bicycles.cycle_hire`
)
 
SELECT 
  rental_id
  , rental_date
  , ROW_NUMBER() OVER(PARTITION BY rental_date ORDER BY end_date) AS
rental_number_on_day
FROM rentals_on_day
ORDER BY rental_date ASC, rental_number_on_day ASC
LIMIT 5

This takes 8.9 seconds (an increase in speed of three times) because the sorting can be done on just a single day of data at a time. It yields the rental number on a day-by-day basis:

Row rental_id rental_date rental_number_on_day
1 40346512 2015-01-04 1
2 40346508 2015-01-04 2
3 40346519 2015-01-04 3
4 40346510 2015-01-04 4
5 40346520 2015-01-04 5

Data skew

The same problem of overwhelming a worker (in this case, overwhelming the memory of the worker) can happen during an ARRAY_AGG with GROUP BY if one of the keys is much more common than the others.14

Because there are more than three million GitHub repositories and the commits are well distributed among them, this query succeeds:

SELECT 
  repo_name
  , ARRAY_AGG(STRUCT(author, committer, subject, message, trailer, difference,
encoding) ORDER BY author.date.seconds)
FROM `bigquery-public-data.github_repos.commits`, UNNEST(repo_name) AS repo_name
GROUP BY repo_name

However, most of the people using GitHub live in only a few time zones, so grouping by the time zone fails—we are asking a single worker to sort a significant fraction of 750 GB:

SELECT 
  author.tz_offset, ARRAY_AGG(STRUCT(author, committer, subject, message, 
trailer, difference, encoding) ORDER BY author.date.seconds)
FROM `bigquery-public-data.github_repos.commits`
GROUP BY author.tz_offset

One solution is to add a LIMIT to the ORDER BY:

SELECT 
  author.tz_offset, ARRAY_AGG(STRUCT(author, committer, subject, message, 
trailer, difference, encoding) ORDER BY author.date.seconds LIMIT 1000)
FROM `bigquery-public-data.github_repos.commits`
GROUP BY author.tz_offset

If you do require sorting all of the data, use more granular keys (i.e., distribute the group’s data over more workers) and then aggregate the results corresponding to the desired key. For example, instead of grouping only by the time zone, it is possible to group by time zone and repo_name and then aggregate across repositories to get the actual answer for each time zone:

SELECT 
  repo_name, author.tz_offset
  , ARRAY_AGG(STRUCT(author, committer, subject, message, trailer, difference,
encoding) ORDER BY author.date.seconds)
FROM `bigquery-public-data.github_repos.commits`, UNNEST(repo_name) AS repo_name
GROUP BY repo_name, author.tz_offset

Optimizing user-defined functions

Invoking a JavaScript user-defined function (UDF) requires a V8 subprocess to be launched, and this degrades performance. JavaScript UDFs are computationally expensive and have access to limited memory, so reducing the amount of data processed by the UDF can help improve performance.

Although BigQuery supports UDFs in JavaScript, opt to write your UDFs using SQL wherever possible; SQL is distributed and optimized by BigQuery natively. If you are writing a UDF in SQL, there is no performance difference between embedding the SQL function directly in the query or using temporary and permanent functions. The reason to use an SQL UDF is for reusability, composability, and readability.
 

Using Approximate Aggregation Functions

BigQuery provides fast, low-memory approximations of aggregate functions. Instead of using COUNT(DISTINCT …), we can use APPROX_COUNT_DISTINCT on large data streams when a small statistical uncertainty in the result is tolerable.

Approximate count

For example, you can find the number of unique GitHub repositories by using:

SELECT 
  COUNT(DISTINCT repo_name) AS num_repos
FROM `bigquery-public-data`.github_repos.commits, UNNEST(repo_name) AS repo_name

This query takes 7.1 seconds to compute the correct result of 3,348,576. On the other hand, the following query takes 3.2 seconds (an increase in speed of two times) and returns an approximate result of 3,400,927, which overestimates the correct answer by 1.5%:

SELECT 
  APPROX_COUNT_DISTINCT(repo_name) AS num_repos
FROM `bigquery-public-data`.github_repos.commits, UNNEST(repo_name) AS repo_name

On smaller datasets, however, there might be no advantage. Let’s look at an example that finds the total number of unique bicycles in the london_bicycles dataset:

SELECT 
  COUNT(DISTINCT bike_id) AS num_bikes
FROM `bigquery-public-data`.london_bicycles.cycle_hire

This takes 0.9 seconds and returns the correct result of 13,705. Using the approximate counterpart takes 1.6 seconds (which is slower than the exact query) and returns an approximate result of 13,699:

SELECT 
  APPROX_COUNT_DISTINCT(bike_id) AS num_bikes
 FROM `bigquery-public-data`.london_bicycles.cycle_hire
Tip

The approximate algorithm is much more efficient than the exact algorithm only on large datasets and is recommended in use cases for which errors of approximately 1% are tolerable. Before using the approximate function, always measure on your use case!

Approximate top

Other available approximate functions include APPROX_QUANTILES to compute percentiles, APPROX_TOP_COUNT to find the top elements, and APPROX_TOP_SUM to compute top elements based on the sum of an element. 

Here’s an example of using APPROX_TOP_COUNT to find the five most frequently rented bicycles:

SELECT 
  APPROX_TOP_COUNT(bike_id, 5) AS num_bikes
FROM `bigquery-public-data`.london_bicycles.cycle_hire

This yields the following:

Row num_bikes.value num_bikes.count
1 12925 2922
  12841 2489
  13071 2474
  12926 2467
  12991 2444

Note that the result is a single row and consists of an array of values so that ordering is preserved.

Tip

If your queries are taking too long, you can use APPROX_TOP_COUNT to check whether data skew is the reason. If so, consider the tips earlier in the chapter on dealing with data skew by carrying out the operation at a more granular level or using LIMIT to reduce the data being processed.

To find the top five stations based on duration of bicycle rentals, you can use APPROX_TOP_SUM:

SELECT 
  APPROX_TOP_SUM(start_station_name, duration, 5) AS num_bikes
FROM `bigquery-public-data`.london_bicycles.cycle_hire
WHERE duration > 0

Here is the result of that query:

Row num_bikes.value num_bikes.sum
1 Hyde Park Corner, Hyde Park 600037440
  Black Lion Gate, Kensington Gardens 581085720
  Albert Gate, Hyde Park 367235700
  Speakers’ Corner 1, Hyde Park 318485820
  Speakers’ Corner 2, Hyde Park 268442640

HLL functions

In addition to the just-described APPROX_* functions (which carry out the entire approximate aggregation algorithm), BigQuery also supports the HyperLogLog++ (HLL++) algorithm, which allows you to break down the count-distinct problem into three separate operations:

  1. Initialize a set, called an HLL sketch, by adding new elements to it by using HLL_COUNT.INIT

  2. Find the cardinality (count) of an HLL sketch by using HLL_COUNT.EXTRACT

  3. Merge two HLL sketches into a single sketch by using HLL_COUNT.MERGE_PARTIAL

In addition, HLL_COUNT.MERGE combines steps 2 and 3, computing the count from a set of HLL sketches.

For example, here’s a query that finds the count of distinct stations in the London bicycles table regardless of whether a trip started or ended at the station:

WITH sketch AS (
SELECT
    HLL_COUNT.INIT(start_station_name) AS hll_start
   , HLL_COUNT.INIT(end_station_name) AS hll_end
FROM `bigquery-public-data`.london_bicycles.cycle_hire
)
 
SELECT 
  HLL_COUNT.MERGE(hll_start) AS distinct_start
  , HLL_COUNT.MERGE(hll_end) AS distinct_end
  , HLL_COUNT.MERGE(hll_both) AS distinct_station
FROM sketch, UNNEST([hll_start, hll_end]) AS hll_both

This returns the following:

Row distinct_start distinct_end distinct_station
1 880 882 882

Of course, you also can achieve this by using APPROX_COUNT_DISTINCT directly:

SELECT
   APPROX_COUNT_DISTINCT(start_station_name) AS distinct_start
  , APPROX_COUNT_DISTINCT(end_station_name) AS distinct_end
  , APPROX_COUNT_DISTINCT(both_stations) AS distinct_station
FROM 
  `bigquery-public-data`.london_bicycles.cycle_hire
  , UNNEST([start_station_name, end_station_name]) AS both_stations

This yields the same result and is much simpler to read and understand. Mostly, therefore, you would use the APPROX_ variants.

One reason to use the HLL functions might be that you need to employ manual aggregation or prevent storage of specific columns. Suppose that your data has the schema user_id, date, product, country, and you need to compute the number of distinct users. However, the column user_id is personally identifying, and so you would prefer to not store it indefinitely. In such a case, you can compute a manual aggregation using HLL_COUNT.INIT, as follows:

INSERT INTO approx_distinct_users_agg AS 
SELECT date, product, country, HLL_COUNT.INIT(user_id) AS sketch
GROUP BY date, product, country, sketch

Now you don’t need to store user_id; you only need to store the sketch. Whenever you need to compute any higher-level aggregation, you can do the following:

SELECT date, HLL_COUNT.MERGE(sketch)
FROM approx_distinct_users_agg 
GROUP BY date

Optimizing How Data Is Stored and Accessed

In the previous  section, we discussed  how to improve query performance, but we limited ourselves to methods that do not change the layout of the table, where it is stored, or how it is accessed. In this section, we look at how addressing these factors can have a dramatic impact on query performance. Obviously, you will want to keep these tips in mind as you design your tables, because changing the schema of a table tends to break existing queries.

Minimizing Network Overhead

BigQuery is a regional service that is globally accessible. If you are querying a dataset that resides in the EU region, for example, the query will run on computational resources that are located in an EU datacenter. If you store the results of the query into a destination table, that table must be in a dataset that is also in the EU. You can, however, invoke the BigQuery REST API (i.e., invoke the query) from anywhere in the world, even from machines outside of GCP.

When working with other GCP resources such as Google Cloud Storage or Cloud Pub/Sub, the best performance will be obtained if you also locate them in the same region as the dataset. Thus, for example, if you are invoking a query from a Compute Engine instance or a Cloud Dataproc cluster, network overhead will be minimized if the instance or cluster is also located in the same region as the dataset being queried.

If you’re invoking BigQuery from outside GCP, consider the network topology and try to minimize the number of hops between the client machine and the GCP datacenter in which the dataset resides.

Compressed, partial responses

When invoking the REST API directly, you can minimize network overhead by accepting compressed, partial responses. To accept compressed responses, you can specify in the HTTP header that you will accept gzip and make sure that the string “gzip” appears in the name of the user-agent—for example:

Accept-Encoding: gzip
User-Agent: programName (gzip)

Then the responses are compressed using gzip.

By default, responses from BigQuery contain all of the fields promised in the documentation. However, if we know what part of the response we are interested in, we can ask BigQuery to send back only that bit of the response, thus lowering the network overhead. For example, earlier in this chapter, we looked at how to get back the complete job details using the Jobs API. If you are interested in only a subset of the full response (for example, only the steps in the query plan), you can specify the field(s) of interest to limit the size of the response:15

JOBSURL="https://www.googleapis.com/bigquery/v2/projects/$PROJECT/jobs"
FIELDS="statistics(query(queryPlan(steps)))"
curl --silent 
    -H "Authorization: Bearer $access_token" 
    -H "Accept-Encoding: gzip" 
    -H "User-Agent: get_job_details (gzip)" 
    -X GET 
    "${JOBSURL}/${JOBID}?fields=${FIELDS}" 
| zcat

Note that we are also specifying that we accept gzip encoding.

Batching multiple requests

When using the REST API, it is possible to batch multiple BigQuery API calls by using the multipart/mixed content type and nesting HTTP requests in each of the parts. The body of each part specifies the HTTP operation (GET, PUT, etc.), path portion of the URL, headers, and body. The server’s response is a single HTTP response with a multipart/mixed content type, with the parts being responses (in order) to the requests that form the batched request. Even though the responses are in order, the server might execute the calls in any order. Thus you should treat the batched request as a parallel execution.

Here’s an example of sending a batched request to get some query plan details of the last five queries in our project. You first use the BigQuery command-line tool to get the five most recent successful jobs:16

# The 5 most recent successful jobs
JOBS=$(bq ls -j -n 50 | grep SUCCESS | head -5 | awk '{print $1}')

The request goes to the batch endpoint for BigQuery:

BATCHURL="https://www.googleapis.com/batch/bigquery/v2"
JOBSPATH="/projects/$PROJECT/jobs"
FIELDS="statistics(query(queryPlan(steps)))"

Using the URL path, you can form the individual requests:

request=""
for JOBID in $JOBS; do
read -d '' part << EOF
 
--batch_part_starts_here
GET ${JOBSPATH}/${JOBID}?fields=${FIELDS}
 
EOF
request=$(echo "$request"; echo "$part")
done

Then you can send the request to the batch endpoint as a multipart request:

curl --silent 
    -H "Authorization: Bearer $access_token" 
    -H "Content-Type: multipart/mixed; boundary=batch_part_starts_here" 
    -X POST 
    -d "$request" 
    "${BATCHURL}"

Bulk reads using BigQuery Storage API

In Chapter 5, we discussed using the BigQuery REST API and associated client libraries to list table data and get query results. The REST API provides the data in record-oriented, paginated views that are more conducive to relatively small result sets. Yet, with the advent of machine learning and distributed Extract, Transform, and Load (ETL) tools, external tools now require fast, efficient bulk access to BigQuery’s managed storage. Such bulk read access is provided by the BigQuery Storage API via Remote Procedure Call (RPC)–based protocol. With the BigQuery Storage API, structured data is sent over the wire in a binary serialization format that maps more closely to the columnar format in which the data is stored. This allows for additional parallelism among multiple consumers for a set of results.

It is unlikely that you will use the BigQuery Storage API directly17 if you are an end user. Instead, you will take advantage of tools such as Cloud Dataflow, Cloud Dataproc, TensorFlow, AutoML, and others that employ the Storage API to read directly from BigQuery’s managed storage instead of going through the BigQuery API. Because the Storage API directly accesses stored data, permission to access the BigQuery Storage API is distinct from the existing BigQuery API. The BigQuery Storage API must be enabled independently of enabling BigQuery.

The BigQuery Storage API provides several benefits to tools that read directly from BigQuery’s managed storage. For example, consumers can read disjoint a set of rows from a table using multiple streams (enabling distributed reads from different workers in Cloud Dataproc, for example), dynamically shard these streams (thus reducing tail latency, which can be a significant problem for MapReduce jobs), select a subset of columns to read (enabling machine learning frameworks to read only the features used by the model), filter column values (reducing the data transmitted over the network), and still ensure snapshot consistency (i.e., read data as of a specific point in time).

In Chapter 5, we looked at using the Jupyter Magics %%bigquery to load the result of queries into pandas DataFrames. However, those datasets were relatively small—on the order of a dozen to a few hundred rows. What if you want to load the entire london_bicycles dataset (24 million rows) into a pandas DataFrame? In that case, it is possible to specify an option on the Magics to load the data into the pandas DataFrame using the Storage API rather than the BigQuery API. You will need to first install the Storage API Python client library with Avro and pandas support. You can do this within Jupyter by using the following:

%pip install google-cloud-bigquery-storage[fastavro,pandas]

Then use the %%bigquery Magics as before, but add the Storage API option:

%%bigquery df --use_bqstorage_api --project $PROJECT
SELECT 
  start_station_name 
  , end_station_name
  , start_date
  , duration
FROM `bigquery-public-data`.london_bicycles.cycle_hire

Note that we are taking advantage of the ability of the Storage API to provide direct access to individual columns; it is not necessary to read the entire BigQuery table into the pandas DataFrame. If it so happens that the amount of data returned by the query is comfortably small, the Magics falls back automatically to the BigQuery API. Therefore, there is no harm in always using this flag in your notebook cells. To turn --use_bqstorage_api on by default in all the Magics cells within a notebook session, you can set a context flag:

import google.cloud.bigquery.magics
google.cloud.bigquery.magics.context.use_bqstorage_api = True

Choosing an Efficient Storage Format

Query performance depends on where the table data is stored, and in what format. In general, the fastest performance is obtained if you store data in such a way that queries need to do very little seeking or type conversion.

Internal versus external data sources

Even though BigQuery supports querying directly from external sources such as Google Cloud Storage, Cloud Bigtable, and Google Sheets, the fastest query performance will be obtained if you use native tables.

We recommend that you use BigQuery as your analytics data warehouse for all your structured and semi-structured data. Use external data sources for staging (Google Cloud Storage), real-time ingest (Cloud Pub/Sub, Cloud Bigtable), or transactional updates (Cloud SQL, Cloud Spanner). Then, as described in Chapter 4, set up a periodic data pipeline to load the data from these external sources into BigQuery.

If your use case is such that you need to query the data from Google Cloud Storage, store it in a compressed, columnar format (e.g., Parquet) if you can. Use row-based formats such as JSON or comma-separated values (CSV) only as a last resort.

Setting up life cycle management on staging buckets

If you are loading data into BigQuery by staging it in Google Cloud Storage first, consider deleting the Google Cloud Storage data after the data is loaded. If you are performing ETL on the data to load it into BigQuery (so that the data in BigQuery is heavily transformed or is only a subset), you might want to retain the raw data in Google Cloud Storage. In such cases, reduce costs by creating life cycle rules on buckets, to downgrade the Google Cloud Storage storage class.

To enable life cycle management on a bucket so that data in multiregional or standard classes older than 30 days is moved to Nearline Storage, and Nearline data older than 90 days is moved to Coldline Storage:

gsutil lifecycle set lifecycle.yaml gs://some_bucket/

In this example, the file lifecycle.yaml contains this content:

{
"lifecycle": {
  "rule": [
  {
     "action": {
       "type": "SetStorageClass",
       "storageClass": "NEARLINE"
     },
     "condition": {
       "age": 30,
       "matchesStorageClass": ["MULTI_REGIONAL", "STANDARD"]
     }
  },
  {
     "action": {
       "type": "SetStorageClass",
       "storageClass": "COLDLINE"
     },
     "condition": {
       "age": 90,
       "matchesStorageClass": ["NEARLINE"]
     }
  }
]}}

You can use life cycle management not just to change an object’s class but also to delete objects older than a certain threshold.18

Storing data as arrays of structs

One of the BigQuery public datasets is a dataset of cyclonic storms (hurricanes, typhoons, cyclones, etc.) as observed and measured by various meteorological agencies around the world. Cyclonic storms can last up to a few weeks, and observations are carried out once every three hours or so. Suppose that you want to query this dataset to find all of the storms in 2018, the maximum wind speed attained by each storm over its lifetime, and the time and location of the storm when this maximum wind speed was reached. This query pulls out the necessary information from the public dataset:

SELECT
  sid, number, basin, name,
  ARRAY_AGG(STRUCT(iso_time, usa_latitude, usa_longitude, usa_wind) ORDER BY
usa_wind DESC LIMIT 1)[OFFSET(0)].*
FROM
  `bigquery-public-data`.noaa_hurricanes.hurricanes
WHERE
  season = '2018'
GROUP BY
  sid, number, basin, name
ORDER BY number ASC

We are extracting the storm ID (sid), the storm number within the season, the basin, and the name of the storm (if named) and then finding the array of observations made of this storm, ranking the observations in descending order of wind speed, and taking the highest wind speed observed for each storm. The storms themselves are ordered by number. The result consists of 88 rows, and looks something like this:

Row sid number basin name iso_time usa_latitude usa_longitude usa_wind
1 2018002N09123 1 WP BOLAVEN 2018-01-02 18:00:00 UTC 9.7 117.2 29
2 2018003S15053 2 SI AVA 2018-01-05 06:00:00 UTC -17.9 50.0 93
3 2018006S13092 3 SI IRVING 2018-01-07 18:00:00 UTC -15.8 83.0 89
4 2018010S18123 4 SI JOYCE 2018-01-11 18:00:00 UTC -18.7 121.6 54

The query processed 41.7 MB and took 1.4 seconds. The first row is of a storm named Bolaven that reached a maximum wind speed of 29 kph on January 2, 2018, at 18:00 UTC.

Because observations are carried out by multiple meteorological agencies, it is possible to standardize this data using nested fields and store the structs in BigQuery, as follows:19

CREATE OR REPLACE TABLE ch07.hurricanes_nested AS
 
SELECT sid, season, number, basin, name, iso_time, nature, usa_sshs,
       STRUCT(usa_latitude AS latitude, usa_longitude AS longitude, usa_wind AS
wind, usa_pressure AS pressure) AS usa,
       STRUCT(tokyo_latitude AS latitude, tokyo_longitude AS longitude,
           tokyo_wind AS wind, tokyo_pressure AS pressure) AS tokyo,
       ... AS cma,
       ... AS hko,
       ... AS newdelhi,
       ... AS reunion,
       ... bom,
       ... AS wellington,
       ... nadi
FROM `bigquery-public-data`.noaa_hurricanes.hurricanes

Querying this table is similar to querying the original table, except that field names change a bit (usa.latitude instead of usa_latitude):

SELECT
  sid, number, basin, name,
  ARRAY_AGG(STRUCT(iso_time, usa.latitude, usa.longitude, usa.wind) ORDER BY
usa.wind DESC LIMIT 1)[OFFSET(0)].*
FROM
  ch07.hurricanes_nested
WHERE
  season = '2018'
GROUP BY
  sid, number, basin, name
ORDER BY number ASC

The query processes the same amount of data and takes the same time as the original query on the public dataset. Using nested fields (i.e., structs) does not change the query speed or cost, although it might make your query more readable.

Because there are multiple observations of the same storm over its life cycle, we can change the storage to have a single row per storm and store an array of observations for each storm:

CREATE OR REPLACE TABLE ch07.hurricanes_nested_track AS
 
SELECT sid, season, number, basin, name,
 ARRAY_AGG(
   STRUCT(
       iso_time,
       nature,
       usa_sshs,
       STRUCT(usa_latitude AS latitude, usa_longitude AS longitude, usa_wind AS
wind, usa_pressure AS pressure) AS usa,
       STRUCT(tokyo_latitude AS latitude, tokyo_longitude AS longitude,
           tokyo_wind AS wind, tokyo_pressure AS pressure) AS tokyo,
       ... AS cma,
       ... AS hko,
       ... AS newdelhi,
       ... AS reunion,
       ... bom,
       ... AS wellington,
       ... nadi
   ) ORDER BY iso_time ASC ) AS obs
FROM `bigquery-public-data`.noaa_hurricanes.hurricanes
GROUP BY sid, season, number, basin, name

Notice that we are now storing sid, season, and so on as scalar columns because they do not change over the lifetime of the storm. The remaining data that changes for each observation is stored as an array of structs. Here’s how querying this table now looks:20

SELECT
  number, name, basin, 
  (SELECT AS STRUCT iso_time, usa.latitude, usa.longitude, usa.wind 
          FROM UNNEST(obs) ORDER BY usa.wind DESC LIMIT 1).*
FROM ch07.hurricanes_nested_track
WHERE season = '2018'
ORDER BY number ASC

The result is the same, but this time the query processes only 14.7 MB (a reduction in cost of three times) and finishes in one second (a 30% improvement in speed). Why does this improvement in performance happen? When we store the data as an array, the number of rows in the table reduces dramatically (from 682,000 to 14,000),21 because there is now only one row per storm instead of one row per observation time. Then when we filter the rows by checking for season, BigQuery is able to discard many related observations simultaneously, as shown in Figure 7-13.

Nested and repeated fields can speed up query performance by allowing BigQuery to discard many related observations simultaneously.
Figure 7-13. Nested and repeated fields can speed up query performance by allowing BigQuery to discard many related observations simultaneously

Another benefit is that we no longer need to duplicate rows of data when we have unequal levels of granularity within the same table. You can store both the granular-level individual hurricane latitude and longitude data as well as the high-level hurricane ID, name, and season data in the same table. And because BigQuery stores table data as highly compressed individual columns, you can query and process the high-level data without the cost of operating over the rows of granular data—it is now stored as an array of values per hurricane.

For example, if you simply want to query the number of storms by year, you could query just the columns you want from the details:

WITH hurricane_detail AS (
 
SELECT sid, season, number, basin, name,
 ARRAY_AGG(
   STRUCT(
       iso_time,
       nature,
       usa_sshs,
       STRUCT(usa_latitude AS latitude, usa_longitude AS longitude, usa_wind AS
wind, usa_pressure AS pressure) AS usa,
       STRUCT(tokyo_latitude AS latitude, tokyo_longitude AS longitude, tokyo_wind
AS wind, tokyo_pressure AS pressure) AS tokyo
   ) ORDER BY iso_time ASC ) AS obs
FROM `bigquery-public-data`.noaa_hurricanes.hurricanes
GROUP BY sid, season, number, basin, name
)
 
SELECT 
  COUNT(sid) AS count_of_storms,
  season
FROM hurricane_detail
GROUP BY season 
ORDER BY season DESC

The preceding query processes 27 MB instead of the 56 MB it would have had to process had we not used nested, repeated fields.

Nested fields by themselves do not improve performance, although they can improve readability by essentially prejoining other related tables into a single location. Nested, repeated fields, on the other hand, are extremely advantageous from a performance standpoint. Consider using nested, repeated fields in your schema, because they have the potential to provide you a significant boost in speed and lower query costs whenever you have queries that filter on a column that is not nested or repeated (season, in our case).

A key drawback to nested, repeated fields is that you cannot easily stream to this table if the streaming updates involve adding elements to existing arrays—it is no longer as simple as appending a row to the table: you now need to mutate the existing row. Of course, because the hurricane data is updated with new observations, this drawback would be quite significant, and it explains why the public dataset of hurricanes does not use nested, repeated fields.

On the other hand, the public dataset of GitHub commits (bigquery-public-data.github_repos.commits) uses a nested, repeated field (repo_name) to store the list of repositories affected by a commit. This doesn’t change over time. So using a nested, repeated field for repo_name provides a speedup to queries that filter on any other field.

Storing data as geography types

Within the BigQuery public dataset of utility data is a table of polygon boundaries of US zip codes (available in bigquery-public-data.utility_us.zipcode_area) and another table of polygon boundaries of US cities (bigquery-public-data.utility_us.us_cities_area). The US zip code geometry column (zipcode_geom) is a string,24 whereas the city geometry column (city_geom) is a geography type.

From these two tables, it is possible to get a list of all the zip codes for Santa Fe, New Mexico:25

SELECT name, zipcode 
FROM `bigquery-public-data`.utility_us.zipcode_area
JOIN `bigquery-public-data`.utility_us.us_cities_area
ON ST_INTERSECTS(ST_GeogFromText(zipcode_geom), city_geom)
WHERE name LIKE '%Santa Fe%'

The query took 51.9 seconds and processed 305.5 MB to produce the following:

Row name zipcode
1 Santa Fe, NM 87505
2 Santa Fe, NM 87501
3 Santa Fe, NM 87507
4 Eldorado at Santa Fe, NM 87508
5 Santa Fe, NM 87508
6 Santa Fe, NM 87506

Why did the query take so long? This is not because ST_INTERSECTS is expensive; it is mainly because the ST_GeogFromText function needs to compute the S2 cells26 and build a GEOGRAPHY type corresponding to each zip code.

We can change the zip code table to do this computation beforehand and store the geometry as a GEOGRAPHY type:

CREATE OR REPLACE TABLE ch07.zipcode_area AS
SELECT 
  * REPLACE(ST_GeogFromText(zipcode_geom) AS zipcode_geom)
FROM
  `bigquery-public-data`.utility_us.zipcode_area
Tip

SELECT * REPLACE (see the previous snippet) is a convenient way to replace a column from a SELECT * statement.

The new dataset is 131.8 MB, which is somewhat larger than the 116.5 MB of the original table. However, the trade-off is that queries against the table can take advantage of S2 coverings and become much faster. Thus, the following query finishes in 5.3 seconds (a speed increase of 10 times) and processes 320.8 MB (a slight increase in cost if you are using on-demand pricing):

SELECT name, zipcode 
FROM ch07.zipcode_area
JOIN `bigquery-public-data`.utility_us.us_cities_area
ON ST_INTERSECTS(zipcode_geom, city_geom)
WHERE name LIKE '%Santa Fe%'

The performance advantages of storing geographic data as GEOGRAPHY types instead of as strings or primitives are very compelling. This is why the utility_us dataset is deprecated (it’s still publicly accessible so as to not break already written queries). We recommend that you use the table bigquery-public-data.geo_us_boundaries.us_​zip_codes, which uses GEOGRAPHY types and is kept up to date.

Partitioning Tables to Reduce Scan Size

Imagine that you frequently query the london_bicycles dataset by year:

SELECT 
  start_station_name 
  , AVG(duration) AS avg_duration
FROM `bigquery-public-data`.london_bicycles.cycle_hire
WHERE EXTRACT(YEAR from start_date) = 2015 
GROUP BY start_station_name
ORDER BY avg_duration DESC
LIMIT 5

This takes 2.8 seconds and processes 1 GB to return the stations responsible for the longest trips in 2015:

Row start_station_name avg_duration
1 Mechanical Workshop Penton 105420.0
2 Contact Centre, Southbury House 5303.75
3 Stewart’s Road, Nine Elms 4836.380090497735
4 Black Lion Gate, Kensington Gardens 4788.747908066496
5 Speakers’ Corner 2, Hyde Park 4610.192911183014

The query, however, must read through the entire table to find rows from 2015. In this section, we look at various ways to cut down on the size of the data being processed.

Antipattern: Table suffixes and wildcards

If filtering by year is very common, one way to cut down on the data being read is to store the data in multiple tables, with the name of each table suffixed by the year. This way, querying the data for 2015 does not require traversing rows corresponding to all the years but can instead just read the cycle_hire_2015 table.

Let’s go ahead and create such a table using the following:

CREATE OR REPLACE TABLE ch07eu.cycle_hire_2015 AS (
  SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire
  WHERE EXTRACT(YEAR from start_date) = 2015
)

Now let’s create it with a year-sharded table. The query finishes in one second (a speed increase of three times) and needs to process only 345 MB (a cost saving of three times):

SELECT 
  start_station_name 
  , AVG(duration) AS avg_duration
 FROM ch07eu.cycle_hire_2015
 GROUP BY start_station_name
 ORDER BY avg_duration DESC
 LIMIT 5
Note

Use partitioned tables and template tables (covered next) instead of manually splitting your data across multiple tables.

It is possible to use wildcards and table suffixes to search for multiple years:

SELECT 
  start_station_name 
  , AVG(duration) AS avg_duration
FROM `ch07eu.cycle_hire_*`
WHERE _TABLE_SUFFIX BETWEEN '2015' AND '2016'
GROUP BY start_station_name
ORDER BY avg_duration DESC
LIMIT 5

Partitioned tables

Partitioned tables allow you to store all of your related data in a single logical table but also efficiently query a subset of that data. If, for example, you store the last year’s worth of data but usually query only the last week, partitioning by time allows you to run queries that will need to scan only the last seven days’ worth of partitions. This can save orders of magnitude in query cost, slot utilization, and time.

The year-named tables discussed in the previous section are inefficient: BigQuery needs to maintain a copy of the schema and metadata for each of the sharded tables and verify permissions on each of the queried tables. Also, a single query cannot query more than 1,000 tables, and this might make querying the entire dataset difficult if your tables are sharded by date rather than by year (1,000 tables is not even three years of data if the tables are date-sharded). Furthermore, streaming into date-sharded tables can lead to the need for clock and time zone synchronization among multiple clients. The recommended best practice, therefore, is to use partitioned tables.

Tip

Partitioning and clustering are the most effective ways to reduce your query cost and improve performance. When in doubt, you should partition and cluster your tables; this enables a number of performance optimizations that are not available to unpartitioned or unclustered tables.

A partitioned table is a special table that is divided into partitions, with the partitions managed by BigQuery. We can create a partitioned version of the London cycle_hire dataset using the following:

CREATE OR REPLACE TABLE ch07eu.cycle_hire_partitioned
   PARTITION BY DATE(start_date) AS
SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire
Tip

You can keep storage costs in check by specifying an expiration time for a partition and asking BigQuery to ensure that users are always using a partition filter (and not querying the entire table by mistake):

CREATE OR REPLACE TABLE ch07eu.cycle_hire_partitioned
   PARTITION BY DATE(start_date) 
   OPTIONS(partition_expiration_days=1000,    
           require_partition_filter=true) AS
SELECT * FROM `bigquery-public-
data`.london_bicycles.cycle_hire

If you forget to set this option at the time of creating the table, you can always add it after the fact:

ALTER TABLE ch07eu.cycle_hire_partitioned
SET OPTIONS(require_partition_filter=true)

Then, to find the stations with the longest average rentals in 2015, query the partitioned table, making sure to use the partition column (start_date) in the filter clause:

SELECT 
  start_station_name 
  , AVG(duration) AS avg_duration
FROM ch07eu.cycle_hire_partitioned
WHERE start_date BETWEEN '2015-01-01' AND '2015-12-31'
GROUP BY start_station_name
ORDER BY avg_duration DESC
LIMIT 5

The query takes one second and processes only 419.4 MB, a little more than the year-sharded table (because of the need to read the start_date column), but it is still a saving over having to read the full dataset. Note, however, that there are disadvantages to formulating the query as follows:

SELECT 
  start_station_name 
  , AVG(duration) AS avg_duration
FROM ch07eu.cycle_hire_partitioned
WHERE EXTRACT(YEAR FROM start_date) = 2015
GROUP BY start_station_name
ORDER BY avg_duration DESC
LIMIT 5

This will end up processing 1 GB and will not yield any savings on the amount of data processed. To obtain the benefits of partitioning, the BigQuery runtime must be able to statically determine the partition filters.

Tip

It is possible to ask BigQuery to automatically partition the table based on ingestion time rather than a date/time column. To do so, use _PARTITIONTIME or _PARTITIONDATE as the partitioning column. These are pseudocolumns that refer to the ingestion time and do not actually exist in your dataset. You can, however, use these pseudocolumns in your queries to restrict the rows being scanned.

If you are streaming to an ingestion-time-partitioned table, data in the streaming buffer is held in the __UNPARTITIONED__ partition. To query data in the __UNPARTITIONED__ partition, look for NULL values in the _PARTITIONTIME pseudocolumn. 

Clustering Tables Based on High-Cardinality Keys

Clustering, like partitioning, is a way to instruct BigQuery to store data in a way that can allow less data to be read at query time. Whereas a partitioned table behaves similarly to a number of independent tables (one per partition), clustered tables are stored in a sorted format as a single table. This ordering allows unlimited unique values to be stored without any performance penalty, and it also means that when a filter is applied, BigQuery can skip opening any file that doesn’t contain the range of values being requested.

Clustering can be done on any primitive nonrepeated columns (INT64, BOOL, NUMERIC, STRING, DATE, GEOGRAPHY, and TIMESTAMP). Usually, you’d cluster on columns that have a very high number of distinct values, like customerId if you have millions of customers. If you have columns that aren’t as high in cardinality but are frequently used together, you can cluster by more than one column at a time. When you cluster by multiple columns, you can filter by any prefix of the clustering columns and realize the benefits of clustering.

If most of the queries on our bicycles dataset use start_station_name and end_station_name, we could optimize the storage to take advantage of this commonality in our queries by creating the table as follows:

CREATE OR REPLACE TABLE ch07eu.cycle_hire_clustered
   PARTITION BY DATE(start_date) 
   CLUSTER BY start_station_name, end_station_name   
AS ( 
 SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire
)

Then queries that use the clustering columns in order could experience a significant benefit—for example:

SELECT 
  start_station_name
  , end_station_name
  , AVG(duration) AS duration
FROM ch07eu.cycle_hire_clustered
WHERE 
  start_station_name LIKE '%Kennington%'
  AND end_station_name LIKE '%Hyde%'    
GROUP BY start_station_name, end_station_name

But in this case, the entire table is only 1.5 GB and fits into a single block, thus there is no improvement.

To see the benefits of clustering, we must use a larger table. Our colleague Felipe Hoffa has conveniently created a clustered table of 2.20 TB of Wikipedia views, where the clustering was as follows:

CLUSTER BY wiki, title

As long as we use wiki (and, optionally, title) in our queries, we will gain the benefits of clustering. For example, we can search English Wikipedia for the number of page views in June 2017 for articles whose titles contained the term “Liberia”:

SELECT title, SUM(views) AS views
FROM `fh-bigquery.wikipedia_v3.pageviews_2017` 
WHERE DATE(datehour) BETWEEN '2017-06-01' AND '2017-06-30'
AND wiki = 'en'
AND title LIKE '%Liberia%'
GROUP BY title

This query took 4.8 seconds elapsed and processed 38.6 GB. Had the table not been clustered (only partitioned),27 the query would have taken 25.9 seconds (five times slower) and processed 180.2 GB (five times costlier). On the other hand, a query that doesn’t first filter by wiki will not experience any benefits.

Clustering by the partitioning column

Partitioned tables are partitioned by date (whether it is a column or it is by ingestion time). If you need hour-level partitioning, one option is to use date-based partitioning and then cluster by hour along with whatever other attributes are appropriate.

So a common pattern is to cluster by the same column as you partition by. For example, if you partition by event_time, a timestamp at which your log events occur, this will let you do very fast and efficient queries over arbitrary time periods that are smaller than the day boundary used in the partitioning. You could, for instance, query over only the last 10 minutes of data, and you wouldn’t need to scan anything older than that.

In a partitioned table, each partition consists of a single day of data, and BigQuery maintains the metadata necessary to ensure that queries, load jobs, and Data Definition Language (DDL)/DML statements all take advantage and maintain the integrity of the partitions.

Reclustering

In a clustered table, BigQuery sorts the data based on the values in the clustering columns and organizes them into storage blocks that are optimally sized for efficient scanning and discarding of unnecessary data. However, unlike with partitioning, BigQuery does not maintain the sorting of data within clusters as data is streamed to it. BigQuery will recluster the data periodically in order to maintain efficient data pruning and scan speed. You can see how efficiently clustered the table is by looking at the clustering_ratio of the table (1.0 is completely optimal).

Updating a table using DML forces a recluster of the partition being updated. For example, assume that you periodically receive a table of corrections (which consist of cycle hires that were somehow missed in the previous updates). Using a MERGE statement such as the following will cause a recluster of any partitions that are updated:

MERGE ch07eu.cycle_hire_clustered all_hires
USING ch07eu.cycle_hire_corrections some_month
ON all_hires.start_station_name = some_month.start_station_name
WHEN MATCHED 
  AND all_hires._PARTITIONTIME = DATE(some_month.start_date) THEN
  INSERT (rental_id, duration, ...)
  VALUES (rental_id, duration, ...)
Tip

If you don’t want to wait until BigQuery gets around to reclustering the table into which you have streamed updates, you can take advantage of the ability of DML statements to force a recluster. For example, you can apply a no-op UPDATE to the partitions of interest (perhaps those written in the past 24 hours):

UPDATE ch07eu.cycle_hire_clustered 
SET start_station_id = 300
WHERE start_station_id = 300
AND start_date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),
INTERVAL 1 DAY)

Table 7-1 summarizes the differences between partitioning and clustering and might help you choose between partitioning a table based on a column and clustering the table by that column.

Table 7-1. Partitioning versus clustering
  Partitioning Clustering
Distinct values Less than 10,000 Unlimited
Data management Like a table (can expire, delete, etc.) DML only
Dry run cost Precise Upper bound
Final cost Precise Block level (difficult to predict)
Maintenance None (exact partitioning happens immediately) Background (impact might be delayed until background clustering occurs)

Side benefits of clustering

Remember that we mentioned that running SELECT * ... LIMIT 10 in BigQuery is an antipattern because it ends up billing you for the full scan of the table. For clustered tables, this is not true. When you’re reading from a clustered table, BigQuery will pass along any optimizations that can be done to prevent reading data. So if you do SELECT * ... LIMIT 10 on a clustered table, the execution engine will be able to stop reading data as soon as 10 rows have been returned. Because of how the query engine employs a number of parallel workers, any of which could happen to finish first, the amount of data that is scanned is not deterministic. On the plus side, you will end up with queries that cost you much less on large tables.

A surprising side effect of the “early stop” cost reduction is that you can get performance benefits even if you don’t filter on the clustering columns—if you filter by columns that are correlated to your clustering columns, BigQuery might be able to read less data!

Suppose that you have a table with two columns: zip_code and state. You cluster based on zip_code, which means that the data is sorted by zip_code when it is stored on disk. In the United States, there is a correlation between state and zip_code because zip code ranges are assigned geographically (00000 in the Northeast and 99999 in the Northwest). If you run a query that filters by state, even though you’re not filtering by the clustering column, BigQuery skips any data block that doesn’t have that state, and you’ll end up paying less for the query.

When a table is clustered, it allows BigQuery to apply a number of performance optimizations that are not possible with nonclustered tables. One of these optimizations is designed for star schemas, which let you filter based on constraints in a dimension table. For example, suppose that you have a fact table containing orders, which is clustered by customer_id, and a dimension table containing customers, and you run the following query:

SELECT o.*
FROM orders o 
JOIN customers c USING (customer_id)
WHERE c.name = "Changying Bao"

Ordinarily, this query would need to scan the full orders table in order to execute the join. But because the orders table is clustered by the column that is being used for the join, the righthand side of the join—querying customers and finding the matching customer IDs—is done first. Then the second part of the query just needs to look up the clustering column that matches the customer_id. From a bytes-scanned perspective, it is as if BigQuery ran the following queries in parallel:

// First look up the customer id.
// This scans only the small dimension table
SET id = SELECT customer_id FROM customers
WHERE c.name = "Changying Bao" 
// Next look up the customer from the orders table.
// This will filter by the cluster column,
// and so only needs to read a small amount of data.
SELECT * FROM orders WHERE customer_id=$id ;

In short, clustering is highly recommended if you want to reduce query costs and improve performance.

Time-Insensitive Use Cases

BigQuery is designed to minimize the time taken to derive insights from data. Consequently, we can carry out ad hoc, interactive analytics on large datasets and stream in updates to those datasets in near real time. Sometimes, though, you might be less time sensitive. Perhaps all you need are nightly reports. In such cases, you might be willing to have your queries be queued up and executed when possible, and this might be sufficient for the reports to reflect data as of an hour ago.

Batch Queries

You can submit a set of queries, called batch queries, to the service, and they will be queued on your behalf and started when idle resources are available. At the time they were introduced, in 2012, batch queries provided a pricing benefit. However, as of this writing, both interactive and batch queries cost the same. If you use flat-rate pricing, both batch queries and interactive queries share your allocated slots.

Because batch queries are not less expensive and use the same reservations and slots as interactive resources, the primary reason that you might want to employ batch queries is that they don’t count toward your concurrent rate limit and can make scheduling hundreds of queries easier.

There are a number of rate limits that affect interactive (i.e., nonbatch) queries. For example, you might be able to have at most 50 queries running concurrently,28 with concurrent byte limits and “large query” limits. If those limits are reached, the query will fail immediately. This is because BigQuery assumes that an interactive query is something the user needs to run immediately. When you use batch queries, on the other hand, if you ever reach a rate limit, the queries will be queued and retried later. There are still similar rate limits, but they operate separately from interactive rate limits, so your batch queries won’t affect your interactive ones.

One example might be that you have periodic queries that you run daily or hourly to build dashboards. Maybe you have 500 queries that you want to run. If you try to run them all at once as interactive, some will fail because of concurrent rate limits. Additionally, you don’t necessarily want these queries to interfere with other queries you are running manually from the BigQuery web UI. So you can run the dashboard queries at batch priority, and the other queries will run normally as interactive.

To use batch queries, provide the --batch flag to the bq command-line tool or specify the job priority in the console or REST API to be BATCH, not INTERACTIVE. If BigQuery hasn’t started the query within 24 hours, BigQuery changes the job priority to interactive. However, the typical wait time before a query starts is on the order of minutes, unless you are submitting more queries than your quota of concurrent requests. Then queries will be run after earlier ones complete.

File Loads

If you care about minimizing the “time to insight” or wish to have the simplest possible data pipeline, we strongly encourage you to use streaming inserts into BigQuery via Cloud Pub/Sub and Cloud Dataflow. This architecture provides ingest speeds on the order of 100,000 rows per second, even on small Cloud Dataflow clusters. You can horizontally scale the Dataflow cluster by adding more machines and achieve several million rows per second of ingest performance without having to tweak any knobs.29

Streaming ingests incur charges, whereas load jobs are free. In some scenarios, you might be willing to trade off a few minutes’ latency for reduced ingestion costs. In that case, consider using file loads instead of streaming inserts. You can do that from Apache Beam’s BigQueryIO using the following:

BigQueryIO.writeTableRows()
                .to("project-id:dataset-id.table-id")
                .withCreateDisposition(
                   BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withMethod(Method.FILE_LOADS)
                .withTriggeringFrequency(Duration.standardSeconds(600))
                .withNumFileShards(10)
                .withSchema(new TableSchema()...)
                .withoutValidation())

The previous code snippet writes to BigQuery every 10 minutes using file loads, thus avoiding the streaming ingestion charge. File loads can scale to 300,000 rows per second on medium-sized Dataflow clusters. However, you should be aware that computing and finalizing windows does take time, and so there will be a latency on the order of a few minutes. Because of per-table and per-project load quotas, and because failures and retries count against quota, we recommend that you do file loads no more frequently than every five minutes.

Summary

In this chapter, we looked at ways to control costs, by using the dry_run feature and by setting up limits on the number of bytes billed. Then we examined ways to measure query speed using the REST API or a custom measurement tool.

We also covered several methods for increasing query speed. To minimize the I/O overhead, we recommended ways to reduce the data being read. We also looked at different entities that could be cached, from previous query results to intermediate results and entire tables in memory. We also looked at ways to do joins efficiently, by avoiding self-joins, reducing the data being joined, and taking advantage of precomputed values. By limiting large sorts, safeguarding against data skew, and optimizing user-defined functions, we can minimize the chances of slots getting overwhelmed. Finally, we recommended the use of approximate aggregation functions, including count and top functions.

Finally, we looked at ways of optimizing data storage and speeding up data access. We suggested that our applications be refactored to accept compressed, partial responses, send requests in batches, and perform bulk reads using the Storage API. We discovered that storing data as arrays of structs and as geography types brings performance advantages. We also looked at different ways of reducing the size of data being scanned, namely through partitioning and clustering.

Checklist

We’d like to end this chapter the way we began, by recommending that, although these performance improvements can be significant, you should verify that they apply to your workflow. Follow this checklist if your query is running slow:

If you observe that: Possible solutions
Self-join is being used Use aggregate functions to avoid self-joins of large tables
Use window (analytic) functions to compute self-dependent relationships
DML is being used Batch your DML (INSERT, UPDATE, DELETE) statements
Join is slow Reduce data being joined
Perhaps denormalize the data
Use nested, repeated fields instead
Queries are being invoked repeatedly Take advantage of query caching
Materialize previous results to tables
Workers are overwhelmed Limit large sorts in window functions
Check for data skew
Optimize user-defined functions
Count, top, distinct are being used Consider using approximate functions
I/O stage is slow Minimize network overhead
Perhaps use a join to reduce table size
Choose efficient storage format
Partition tables
Cluster tables

1 This quote is from his 1974 article “Structured Programming with go to Statements.” You can download a PDF of the article from http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.103.6084.

2 The difference can be difficult to measure precisely because BigQuery is a service, and network overhead to reach the service and the load on the service tends to vary. You might need to run the query many times to get a good estimate of the speed.

3 A slot is a unit of computational capacity required to execute SQL queries. BigQuery automatically calculates how many slots are required by each query. See Chapter 6 for more details.

4 Nonparallelizable operations might not add to the cost, because the remaining slots can presumably address other workloads.

5 See https://cloud.google.com/bigquery/docs/sandbox for further details.

6 See 07_perf/time_query.sh in the GitHub repository for this book.

7 See 07_perf/install_workload_tester.sh in the GitHub repository for this book.

8 See 07_perf/time_bqwt.sh in the GitHub repository for this book.

9 This is 07_perf/get_job_details.sh in the GitHub repository for this book. You can get the required job ID from the “Query history” in the BigQuery web UI. If your query was run outside the US and EU, you also need to specify the job location in a job resource object (sorry!). You can also get the job details by using bq ls -j.

10 If you are reading this in a format that renders the diagram in grayscale, please try out the query and look at the query details in the BigQuery web UI.

11 You will need to measure this, of course. In some cases, the extra overhead involved in reading the table of intermediate results will make this more expensive than simply recomputing the results of a WITH clause.

12 You can verify that this is the key reason for the incorrect value for num_babies by adding state and year to the USING clause (and making sure to add the two fields to the first two selects). Then the number of babies is in the correct ballpark (e.g., 2,018,162 for Jessie, whereas the correct answer is 229,263). The answer is still incorrect because rows with NULLs for these fields are ignored by the join (NULL is never equal to anything else).

13 In bicycling and running, pace is the inverse of speed.

14 This is a current limitation of the BigQuery dynamic execution runtime; it might be eased in the future.

15 This is 07_perf/get_job_details_compressed.sh in the GitHub repository for this book.

16 This is 07_perf/get_recent_jobs.sh in the GitHub repository for this book.

17 You can if you want to, however; see https://cloud.google.com/bigquery/docs/reference/storage/samples. The endpoint for the BigQuery Storage API is different from that of the BigQuery REST API—it’s bigquerystorage.googleapis.com.

18 For more details, see https://cloud.google.com/storage/docs/managing-lifecycles#change_an_objects_storage_class.

19 You can find all queries in this section in 07_perf/hurricanes.sql in the GitHub repository for this book.

20 Chapter 2 covers SQL syntax for working with arrays.

21 Because the hurricanes dataset is continually refreshed, the row numbers might be larger when you are reading this.

22 The dataset is bigquery-public-data.google_analytics_sample.ga_sessions_20170801.

23 This is in 07_perf/google_analytics.sql in the GitHub repository for this book.

24 This is actually a deprecated dataset for reasons that will become apparent in this section. The more up-to-date data is bigquery-public-data.geo_us_boundaries.us_zip_codes—that dataset does use geography types.

25 Chapter 4 covers geographic types and GIS functions.

26 See https://oreil.ly/PkIsx.

27 Try it using fh-bigquery.wikipedia_v2.pageviews_2017.

28 This is the default, but you can request these quotas to be increased. See https://cloud.google.com/bigquery/quotas.

29 Because streaming ingests are charged for, you might have to ask for a quota increase. Check the default quotas at https://cloud.google.com/bigquery/quotas#streaming_inserts and request additional streaming quota from the GCP console.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset