Performance tuning of BigQuery is usually carried out because we want to reduce query execution times or cost, or both. In this chapter, we look at a number of performance optimizations that might work for your use case.
Donald Knuth, the legendary computer scientist, made the famous observation that premature optimization is the root of all evil. Yet Knuth’s full quote is more balanced:1
We should forget about small efficiencies, say about 97% of the time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%. A good programmer will not be lulled into complacency by such reasoning, he will be wise to look carefully at the critical code; but only after that code has been identified.
Following Knuth, we would like to caution that performance tuning should be carried out only at the end of the development stage, and only if it is observed that typical queries take too long. It is far better to have flexible table schema and elegant, readable, and maintainable queries than to obfuscate your table layouts and queries in search of a tiny bit of added performance. However, there will be instances for which you do need to improve the performance of your queries, perhaps because they are carried out so often that small improvements are meaningful. Another aspect to consider is that knowledge of performance trade-offs can help you in deciding between alternative designs.
In this chapter, we do two things. In the first part, we show you how to measure the performance of queries so that you can identify the critical, optimizable parts of your program. Then we draw on the experience of BigQuery users and our knowledge of BigQuery’s architecture to identify the types of things that tend to fall into Knuth’s critical 3%. This way, we can design table schema and queries with an awareness of where performance bottlenecks are likely to occur, thereby helping us make optimal choices during the design phase.
To optimize the performance of queries in BigQuery, it helps to understand the key drivers of query speed, which is the focus of the second part of this chapter. The time taken for a query to complete depends on how much data is read from storage, how that data is organized, how many stages your query requires, how parallelizable those stages are, how much data is processed at each stage, and how computationally expensive each of the stages is.
In general, a simple query that reads three columns will take 50% more time than a query that reads only two columns, because the three-column query needs to read 50% more data.2 A query that requires a group-by will tend to be slower than a query that doesn’t, because the group-by operation adds an extra stage to the query.
The cost of a query depends on your pricing plan. There are two types of BigQuery pricing plans. The first type is on-demand pricing, in which your employer pays ours (Google) based on the amount of data processed by your queries. If you are on a flat-rate plan, your business gets a certain number of slots3 (e.g., 500 slots), and you can run as many queries you want without incurring any additional costs.
In an on-demand (per query) pricing plan, the cost of a query is proportional to the amount of data processed by the query. To reduce cost in an on-demand pricing model, your queries should process less data. In general, reducing the amount of data scanned will also improve query speed. The third part of this chapter—optimizing how data is stored and accessed—should be of help there.
If you are using a flat-rate reservation, the net cost of your query is quite aligned with the time taken for the query to complete.4 You can indirectly reduce costs in a flat-rate model by hogging the slot reservations for less time—that is, by increasing your query speeds, as discussed in the second part of this chapter.
If you are on an on-demand pricing plan, you can obtain a cost estimate for a query before submitting it to the service. The BigQuery web user interface (UI) validates queries and provides an estimate of the amount of data that will be processed. You can see the number of bytes the query will process before you run the query by clicking the Query Validator. If you are using the bq
command-line client, specify --dry_run
to take a look at the query plan and the amount of data processed before invoking the query for real. Dry runs are free. Knowing the amount of data that will be processed by the query, you can use the Google Cloud Platform (GCP) Pricing Calculator to estimate the cost of the query in dollars. Tools such as BigQuery Mate and superQuery provide a price estimate directly but might not have access to information about negotiated discounts. As of this writing, the cost is five dollars per terabyte (for US and EU multiregions) after the free tier usage of one free terabyte per month is exceeded. Note that BigQuery needs to read only the columns that are referenced in your queries, and partitioning and clustering can further reduce the amount of data that needs to be scanned (and hence the cost).
To experiment with BigQuery, you can use the BigQuery sandbox. This is subject to the same limits as the free tier (10 GB of active storage and 1 TB of processed query data per month), but you can access it without a credit card being required.5
When invoking a query, you can specify the --maximum_bytes_billed
parameter to put a limit on the amount of data that a query can process. If the bytes scanned in the query exceeds the maximum bytes that can be billed, the query will fail without incurring a charge. You can also manage costs by requesting a custom quota from the GCP Cloud Console for a limit on the amount of query data processed per day. You can set this limit at a per-project or per-user level.
When trying to control costs, it can be helpful to create a short list of queries to focus on. You can do this by querying the INFORMATION_SCHEMA
associated with a project to find the most expensive queries:
SELECT job_id , query , user_email , total_bytes_processed , total_slot_ms FROM `some-project`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(YEAR FROM creation_time) = 2019 ORDER BY total_bytes_processed DESC LIMIT 5
The preceding query lists the five most expensive queries in 2019 in some project based on total_bytes_processed
. If you are on flat-rate pricing, you might choose to order the queries based on total_slot_ms
instead.
To tune the performance of queries, it is important to ascertain all of the following aspects of a query so that you know what to focus on:
How much data is read from storage and how that data is organized
How many stages your query requires and how parallelizable those stages are
How much data is processed at each stage and how computationally expensive each stage is
As you can see, each of these requires one aspect that can be measured (e.g., how much data is read) and something that needs to be understood (e.g., the performance implications of how that data is organized).
In this section, we look at how we can measure the performance of a query, peruse BigQuery logs, and examine query explanations. Having done this, we can use our understanding of BigQuery architecture (Chapter 6) and performance characteristics (later sections in this chapter) to potentially improve performance. In the later sections of this chapter, we present queries and their performance characteristics without spelling out the measurement steps that might lead you to apply those performance improvements.
Because BigQuery has a REST API, it is possible to use any web service measurement tool to measure how long a query takes to execute. If your organization already uses one of these tools, it is quite straightforward to use them to measure the performance of a query.
Occasionally, you will need to measure performance of a query from a server where these rich clients are not installed. On such bare-bones machines, the simplest way to measure query time is to use the Unix tools time
and curl
. As explained in Chapter 5, we can read in the query text and request JSON into B
ash variables:6
read -d '' QUERY_TEXT << EOF SELECT start_station_name , AVG(duration) as duration , COUNT(duration) as num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire GROUP BY start_station_name ORDER BY num_trips DESC LIMIT 5 EOF read -d '' request << EOF { "useLegacySql": false, "useQueryCache": false, "query": "${QUERY_TEXT}" } EOF request=$(echo "$request" | tr ' ' ' ')
One key point to note from the preceding code snippet is that we need to turn the query cache off so that all the necessary processing is carried out on the server side each time we invoke the query.
Chapter 5 also discusses how to use the gcloud
command-line tool to get the access token and project ID that we require to invoke the REST API:
access_token=$(gcloud auth application-default print-access-token) PROJECT=$(gcloud config get-value project)
Finally, we invoke the query repeatedly and compute the total time taken so that we can compute the average query performance and not be at the mercy of occasional network hiccups:
NUM_TIMES=10 time for i in $(seq 1 $NUM_TIMES); do echo -en " ... $i / $NUM_NUMTIMES ..." curl --silent -H "Authorization: Bearer $access_token" -H "Content-Type: application/json" -X POST -d "$request" "https://www.googleapis.com/bigquery/v2/projects/$PROJECT/queries" > /dev/null done
When we did this, we got the following result:
Real 0m16.875s User 0m0.265s Sys 0m0.109s
The total time to run the query 10 times was 16.875 seconds, indicating that the query took 1.7 seconds on average. Note that this includes the roundtrip time to the server and time spent fetching results; it is not purely the query processing time.
We can estimate what this roundtrip time is by turning the query cache on:
read -d '' request << EOF { "useLegacySql": false, "useQueryCache": true, "query": "${QUERY_TEXT}" } EOF
When we repeat the time query again, we get the following:
Real 0m6.760s user 0m0.264s sys 0m0.114s
Because the query is cached, the new numbers are almost all due to network latency. This indicates that the actual query processing time is (16.875 – 6.760)/10, or about 1 second.
Although using a web service measurement tool or Unix low-level tools is possible and desirable on bare-bones systems, we recommend that you use the BigQuery Workload Tester for measuring the speed of BigQuery queries in your development environment. Unlike a vanilla web service measurement tool, the Workload Tester is able to net out the roundtrip network time (over which you have little control) and report the query processing time (which is what you want to optimize) without having to repeat the queries. It can measure the time taken for individual queries and for workloads (queries that need to be executed serially), and it can invoke the queries in parallel if you want concurrency testing.
The Workload Tester requires Gradle, an open source build tool. Thus, to install the Workload Tester, you first need to install Gradle. Cloud Shell provides a quick way to try out the Workload Tester. On it and other Debian-based Linux systems, you can install Gradle using the following command:
sudo apt-get -y install gradle
On macOS, you can use this:
brew install gradle
For other operating systems, see the Gradle installation instructions.
Then clone the GitHub repository containing the Workload Tester, and build it from source:7
git clone https://github.com/GoogleCloudPlatform/pontem.git cd pontem/BigQueryWorkloadTester gradle clean :BigQueryWorkloadTester:build
Let’s measure the speed of a query to find the average duration of bicycle trips in London. As with our script in the previous section, we could have simply embedded the query in a Bash variable, but it is helpful to have a record of queries measured, so we will write the query text to a file:8
cat <<EOF| tr ' ' ' ' > queries/busystations.sql SELECT start_station_name , AVG(duration) as duration , COUNT(duration) as num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire GROUP BY start_station_name ORDER BY num_trips DESC LIMIT 5 EOF
We then create a configuration file for each workload for which the workload consists of a set of queries or query files:
cat <<EOF>./config.yaml concurrencyLevel: 1 isRatioBasedBenchmark: true benchmarkRatios: [1.0, 2.0] outputFileFolder: $OUTDIR workloads: - name: "Busy stations" projectId: $PROJECT queryFiles: - queries/busystations.sql outputFileName: busystations.json EOF
In this configuration, we set the base concurrency level to 1
, meaning that we send only one query at a time. We do, however, also specify a set of benchmark ratios to measure query times at concurrency levels between 1.0
and 2.0
times the base concurrency level (i.e., 1 and 2 concurrent queries). To try concurrency levels of 1, 2, 5, 10, 15, and 20, use the following:
concurrencyLevel: 10 isRatioBasedBenchmark: true benchmarkRatios: [0.1, 0.25, 0.5, 1.0, 1.5, 2.0]
Then we launch the measurement tool by using this:
gradle clean :BigQueryWorkloadTester:run
The result is a file that contains both the total elapsed time (including roundtrip) and the actual processing time for each of the queries that were run. In our case, the first query (with concurrency level of 1) had a query processing time of 1,111 milliseconds (1.111 seconds), whereas the second and third queries (which ran simultaneously because of the concurrency level of 2) had processing times of 1.108 seconds and 1.026 seconds. In other words, BigQuery provided nearly the same performance whether it was handling one query or two.
Aside from measuring the speed of individual queries, it can be helpful to gauge the performance of entire workflows using the BigQuery logs. You can do this from the GCP web console, in the Stackdriver Logging section. For example, you can look at warnings (and more severe errors) issued by BigQuery for queries and operations on the dataset from Chapter 5 by selecting the ch05
dataset and Warning
from the drop-down menus, as shown in Figure 7-1.
This ability to view all BigQuery error messages from a project in a centralized location can be helpful, especially if the queries emanate from scripts and dashboards that don’t display BigQuery error messages. Indeed, it is possible (provided you have the necessary permissions) to look at the logs and piece together the set of operations that are being carried out by a workload to determine whether unnecessary operations are being performed. For example, look at the subset of the operations on the dataset ch05eu
, shown in Figure 7-2, and read it from the bottom to the top.
In this case, it appears that a dataset named ch05eu
was created, and a table named cycle_stations_copy
was added to it. Then an attempt was made to delete the ch05eu
dataset, but it failed because the dataset was not empty. A new table named bad_bikes
was added. After this, the bad_bikes
table was deleted, and the cycle_stations_copy
table was also deleted. Finally, the dataset itself was deleted.
We can examine the details of each of the jobs—for example, let’s look at the first insert job that created the cycle_stations_copy
. The details include the schema of the created table, as illustrated in Figure 7-3.
Given these details and knowledge of the context, it might be the case that the cycle_stations_copy
table did not use any of the fields in bad_bikes
. Perhaps the entire set of operations around bad_bikes
was unnecessary and can be removed from the workflow.
In addition to measuring query speed and examining the logs, you can diagnose query performance by looking at information available about the query plan. The query plan information lists the stages into which the query is broken down and provides information about the data processed in each of the execution steps that make up each stage. The query plan information is available in JSON form from the job information and visually in the BigQuery web UI.
In BigQuery, the execution graph of an SQL statement is broken up into query stages, where each stage consists of units of work that are executed in parallel by many workers. The stages communicate via a distributed shuffle architecture (see Chapter 6), and so most stages start by reading the output of previous stages and end by writing to the input of subsequent stages. Keep in mind that it is not necessary for a previous stage to complete before a subsequent stage starts—stages can start with the data at hand. So stages do not execute sequentially.
You should keep in mind that the query plan is dynamic given that the exact data size and computational cost of intermediate stages is not known before the stage in question is executed. If the actual size is very different from the anticipated size, new stages might be introduced so as to repartition the data and improve data distribution among the workers. Because of the dynamic nature of the query plan, when exploring query performance, look at the query plan information after the query is complete.
The information listed about each stage of a completed query in the job information includes the timestamps at which the stage was started and finished, the total number of records read and written, and the number of bytes written across all workers in order to process this query. For example, try executing the following query:
SELECT start_station_name, AVG(duration) AS duration, COUNT(duration) AS num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire GROUP BY start_station_name ORDER BY num_trips DESC LIMIT 5
Now you can list the job details by invoking the REST API:9
JOBID=8adbf3fd-e310-44bb-9c6e-88254958ccac # CHANGE access_token=$(gcloud auth application-default print-access-token) PROJECT=$(gcloud config get-value project) curl --silent -H "Authorization: Bearer $access_token" -X GET "https://www.googleapis.com/bigquery/v2/projects/$PROJECT/jobs/$JOBID"
For example, the data about the first stage includes the following information about the time spent waiting for data, the ratio of I/O to compute in the query, and data read, shuffled, and written out:
"waitRatioAvg": 0.058558558558558557, "readRatioAvg": 0.070270270270270274, "computeRatioAvg": 0.86036036036036034 ... "shuffleOutputBytes": "356596", "shuffleOutputBytesSpilled": "0", "recordsRead": "24369201", "recordsWritten": "6138", "parallelInputs": "7",
Because this is primarily an input stage, we care primarily about read and shuffle performance—because this stage kept the processor busy 86% of the time and no data needed to be spilled to disk, it appears that bottlenecks (if any) in this query are not related to I/O. If this query is slow, we’ll need to look elsewhere, perhaps at reasons for the relatively low number of parallelization (7
); in this case, this low count is fine because the input data of 24 million records is rather small and quite capable of being processed in seven chunks.
Much of the information about the query plan in the job details is represented visually in the BigQuery web UI. Open the query and click the “Execution details” tab to see a depiction of the query plan. This includes the overall timing as well as a breakdown of this timing at the level of the steps that make up each stage of the query.
As Figure 7-4 shows, the “Execution details” tab provides key information about query performance.
The overall timing depicted in Figure 7-5 indicates seven parallel inputs being active in the first 0.6 seconds, and then two more units being activated.
Further details of the stages and steps are provided visually as well, as illustrated in Figure 7-6.
From Figure 7-6, we see that the query has three stages. The first stage (S00
) is the input stage, the second (S01
) consists of sorting, and the third (S02
) is the output stage. The timing of each stage is shown visually as well in Figure 7-6. The ratios that were available numerically in the JSON response to the job details request are depicted in the color bars shown in Figure 7-7.10
The dark colors indicate the average time spent waiting, reading, computing, or writing. Here, most of the time is spent performing computation, and a little bit of time is spent waiting. The wait stage is somewhat variable (the intermediate color indicates the maximum wait, and the difference between the average and the maximum is an indicator of variability)—here, the maximum wait time is nearly double the average wait time. Reading, on the other hand, is quite consistent, whereas the writing overhead is negligible.
Zooming in on the input stage, shown in Figure 7-8, we see that it consists of three steps: reading two columns (duration
and start_station_name
, referred to as $1
and $2
) from the BigQuery table, aggregating, and writing the output.
The aggregation step consists of three operations: grouping by start_station_name
(recall that $2
refers to this input column), finding the average duration
($1
) within each shard, and maintaining the count of non-null durations. The writing stage writes the groups ($30
), average duration ($20
), and count of duration ($21
) to an intermediate output, distributing the output by the hash of the station name. In case more than one worker is needed in the next stage, the hash of the station name controls which workers process which parts of the stage_00
output.
Looking back again at Figure 7-6, you can see that the first stage is carried out in parallel over seven workers, while the remaining two stages are carried out on a single worker. The input stage reads in 24.4 million rows and writes out around 6,140 rows, which totals 348 KB. These rows are sorted by the second stage, and five rows are written to the third stage. When we have a stage with only one worker, we should ensure that the memory load on that worker is well within the bounds of what a single worker can handle (we cover ways to do so later in this chapter)—348 KB definitely qualifies, and so this query should not pose any performance issues due to limits on the resources available to a single worker.
Another option to visualize the BigQuery query plan is to use the BigQuery Visualizer at https://bqvisualiser.appspot.com/, as shown in Figure 7-9.
The visualizer becomes especially useful for complex queries with tens of stages, which can be difficult to comprehend from just the synopsis available in the BigQuery web UI.
As discussed in the previous section, you should carry out the following steps to measure query speed and identify potential problems:
Measure the overall workload time using the BigQuery Workload Tester.
Examine the logs to ensure that the workload is not performing any unexpected operations.
Examine the query plan information of the queries that form the workload to identify bottlenecks or unnecessary stages.
Once you have identified that a problem exists and have determined that there are no obvious errors in the workflow, it is time to consider how to improve speed on the critical parts of the workload. In this section, we provide a few possible ways to improve query speed, including the following:
Minimizing I/O
Caching the results of previous queries
Performing efficient joins
Avoiding overwhelming a worker
Using approximate aggregation functions
As we noted earlier, a query that computes the sum of three columns will be slower than a query that computes the sum of two columns, but most of the performance difference will be due to reading more data, not the extra addition. Therefore, a query that computes the mean of a column will be nearly as fast as a query whose aggregation method is to compute the variance of the data (even though computing variance requires BigQuery to keep track of both the sum and the sum of the squares), because most of the overhead of simple queries is caused by I/O, not by computation.
Because BigQuery uses columnar file formats, the fewer the columns that are read in a SELECT
, the less the amount of data that needs to be read. In particular, doing a SELECT *
reads every column of every row in the table, making it quite slow and expensive. The exception is when you use a SELECT *
in a subquery and then reference only a few fields in an outer query; the BigQuery optimizer will be smart enough to read only the columns that are absolutely required.
Explicitly list the columns that you want to see in the final result. For example, it is much more efficient to find the bike_id
responsible for the longest duration trip in the dataset by doing the following:
SELECT bike_id , duration FROM `bigquery-public-data`.london_bicycles.cycle_hire ORDER BY duration DESC LIMIT 1
A less efficient method is this:
SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire ORDER BY duration DESC LIMIT 1
The first query took us 1.8 seconds and cost 372 MB, whereas the second one took us 5.5 seconds (three times slower) and cost 2.59 GB (seven times costlier).
Unless you are reading from a clustered table, applying a LIMIT
clause does not affect the amount of data you are billed for reading. When reading clustered tables, reductions in the number of bytes scanned will be passed along to you as savings (although they will be less predictable). To preview a table, use the preview button on the web UI instead of doing a SELECT *
with a LIMIT
. The preview does not incur charges, whereas a SELECT *
incurs the same charge as a table scan.
If you require nearly all of the columns in a table, consider using SELECT * EXCEPT
so as to avoid reading the ones you don’t require (see Chapter 2).
When tuning a query, it is important to start with the data that is being read and consider whether it is possible to reduce it. Suppose that you want to find the typical duration of the most common one-way rentals; you could do the following:
SELECT MIN(start_station_name) AS start_station_name , MIN(end_station_name) AS end_station_name , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration , COUNT(duration) AS num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire WHERE start_station_id != end_station_id GROUP BY start_station_id, end_station_id ORDER BY num_trips DESC LIMIT 10
This takes 14.7 seconds when we run it, and it yields the following:
Row | start_station_name | end_station_name | typical_duration | num_trips |
---|---|---|---|---|
1 | Black Lion Gate, Kensington Gardens | Hyde Park Corner, Hyde Park | 1,500 | 12,000 |
2 | Black Lion Gate, Kensington Gardens | Palace Gate, Kensington Gardens | 780 | 11,833 |
3 | Hyde Park Corner, Hyde Park | Albert Gate, Hyde Park | 1,920 | 11,745 |
4 | Hyde Park Corner, Hyde Park | Triangle Car Park, Hyde Park | 1,380 | 10,923 |
5 | Hyde Park Corner, Hyde Park | Black Lion Gate, Kensington Gardens | 1,680 | 10,652 |
The details of the query indicate that the sorting (for the approximate quantiles for every station pair) requires a repartition of the outputs of the input stage, but most of the time is spent during computation, as demonstrated in Figure 7-10.
Nevertheless, we can reduce the I/O overhead of the query if we do the filtering and grouping using the station name rather than the station ID, because we will need to read fewer columns:
SELECT start_station_name , end_station_name , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration , COUNT(duration) AS num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire WHERE start_station_name != end_station_name GROUP BY start_station_name, end_station_name ORDER BY num_trips DESC LIMIT 10
This query avoids the need to read the two ID columns and finishes in 9.6 seconds, a 30% increase in speed. This increase is caused by the downstream effects of reading less data: the query requires one less repartition, and fewer workers (10, versus 19 earlier) for the sort, as shown in Figure 7-11.
The query result remains the same because there is a 1:1 relationship between the station name and the station ID.
Suppose that you want to find the total distance traveled by each bicycle in the dataset. A naive way to do this would be to find the distance traveled in each trip undertaken by each bicycle and sum up the distances:
WITH trip_distance AS ( SELECT bike_id , ST_Distance(ST_GeogPoint(s.longitude, s.latitude), ST_GeogPoint(e.longitude, e.latitude)) AS distance FROM `bigquery-public-data`.london_bicycles.cycle_hire, `bigquery-public-data`.london_bicycles.cycle_stations s, `bigquery-public-data`.london_bicycles.cycle_stations e WHERE start_station_id = s.id AND end_station_id = e.id ) SELECT bike_id , SUM(distance)/1000 AS total_distance FROM trip_distance GROUP BY bike_id ORDER BY total_distance DESC LIMIT 5
This query takes 7.1 seconds (44 seconds of slot time) and shuffles 1.69 MB. The result is that some bicycles have been ridden nearly 6,000 kilometers:
Row | bike_id | total_distance |
---|---|---|
1 | 12925 | 5990.988493972133 |
2 | 12757 | 5919.736998793672 |
3 | 12496 | 5883.1268196056335 |
4 | 12841 | 5870.757769474104 |
5 | 13071 | 5853.763514457338 |
Computing the distance is a pretty expensive operation, and we can avoid joining the cycle_stations
table against the cycle_hire
table if we precompute the distances between all pairs of stations:
WITH stations AS ( SELECT s.id AS start_id , e.id AS end_id , ST_Distance(ST_GeogPoint(s.longitude, s.latitude), ST_GeogPoint(e.longitude, e.latitude)) AS distance FROM `bigquery-public-data`.london_bicycles.cycle_stations s, `bigquery-public-data`.london_bicycles.cycle_stations e ),
The rest of the query is quite similar, except that the join is against the table of precomputed distances:
trip_distance AS ( SELECT bike_id , distance FROM `bigquery-public-data`.london_bicycles.cycle_hire, stations WHERE start_station_id = start_id AND end_station_id = end_id ) SELECT bike_id , SUM(distance)/1000 AS total_distance FROM trip_distance GROUP BY bike_id ORDER BY total_distance DESC LIMIT 5
Now the query takes only 31.5 seconds of slot time (a 30% increase in speed) in spite of having to shuffle more data (33.44 MB) between nodes.
The BigQuery service automatically caches query results in a temporary table. If the identical query is submitted within approximately 24 hours, the results are served from this temporary table without any recomputation. Cached results are extremely fast and do not incur charges.
There are, however, a few caveats to be aware of. Query caching is based on exact string comparison. So even whitespaces can cause a cache miss. Queries are never cached if they exhibit nondeterministic behavior (for example, they use CURRENT_TIMESTAMP
or RAND
), if the table or view being queried has changed (even if the columns/rows of interest to the query are unchanged), if the table is associated with a streaming buffer (even if there are no new rows), if the query uses Data Manipulation Language (DML) statements, or if it queries external data sources.
We recommend against reading directly from the cached temporary tables, because cached tables can expire—if the results of a query can serve as inputs to other queries, we recommend the use of tables or materialized views, as discussed in the next section.
It is possible to improve overall performance at the expense of increased I/O by taking advantage of temporary tables and materialized views. For example, suppose that you have a number of queries that start out by finding the typical duration of trips between a pair of stations:
WITH typical_trip AS ( SELECT start_station_name , end_station_name , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration , COUNT(duration) AS num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire GROUP BY start_station_name, end_station_name )
The WITH
clause (also called a common table expression) improves readability but does not improve query speed or cost because results are not cached. The same applies to views and subqueries as well. If you find yourself using a WITH
clause, a view, or a subquery often, one way to potentially improve performance is to store the result in a table (or materialized view):11
CREATE OR REPLACE TABLE ch07eu.typical_trip AS SELECT start_station_name , end_station_name , APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration , COUNT(duration) AS num_trips FROM `bigquery-public-data`.london_bicycles.cycle_hire GROUP BY start_station_name, end_station_name
Let’s use the WITH
clause to find days when bicycle trips are much longer than usual:
SELECT EXTRACT (DATE FROM start_date) AS trip_date , APPROX_QUANTILES(duration / typical_duration, 10)[OFFSET(5)] AS ratio , COUNT(*) AS num_trips_on_day FROM `bigquery-public-data`.london_bicycles.cycle_hire AS hire JOIN typical_trip AS trip ON hire.start_station_name = trip.start_station_name AND hire.end_station_name = trip.end_station_name AND num_trips > 10 GROUP BY trip_date HAVING num_trips_on_day > 10 ORDER BY ratio DESC LIMIT 10
This takes 19.1 seconds and processes 1.68 GB. Now, let’s use the table:
SELECT EXTRACT (DATE FROM start_date) AS trip_date , APPROX_QUANTILES(duration / typical_duration, 10)[OFFSET(5)] AS ratio , COUNT(*) AS num_trips_on_day FROM `bigquery-public-data`.london_bicycles.cycle_hire AS hire JOIN ch07eu.typical_trip AS trip ON hire.start_station_name = trip.start_station_name AND hire.end_station_name = trip.end_station_name AND num_trips > 10 GROUP BY trip_date HAVING num_trips_on_day > 10 ORDER BY ratio DESC LIMIT 10
This takes 10.3 seconds (a 50% increase in speed because the computation is avoided) and processes 1.72 GB (a slight increase in cost because the new table is now being read). Both queries return the same result, that trips on Christmas take longer than usual:
Row | trip_date | ratio | num_trips_on_day |
---|---|---|---|
1 | 2016-12-25 | 1.6 | 34477 |
2 | 2015-12-25 | 1.5263157894736843 | 20871 |
3 | 2015-08-01 | 1.25 | 41200 |
4 | 2016-07-30 | 1.2272727272727273 | 43524 |
5 | 2015-08-02 | 1.2222222222222223 | 41243 |
The table ch07eu.typical_trip
is not refreshed when new data is added to the cycle_hire
table. One way to solve this problem of stale data is to use a materialized view or to schedule queries to update the table periodically. You should measure the cost of such updates to see whether the improvement in query performance makes up for the extra cost of keeping the intermediate table up to date.
If there are tables that you access frequently in Business Intelligence (BI) settings, such as dashboards with aggregations and filters, one way to speed up your queries is to employ BI Engine. It will automatically store relevant pieces of data in memory (either actual columns from the table or derived results) and will use a specialized query processor tuned for working with mostly in-memory data. You can use the BigQuery Admin Console to reserve the amount of memory (up to a current maximum of 10 GB) that BigQuery should use for its cache, as depicted in Figure 7-12.
Make sure to reserve this memory in the same region as the dataset you are querying. Then BigQuery will start to cache tables, parts of tables, and aggregations in memory and serve results faster.
A primary use case for BI Engine is for tables that are accessed from dashboard tools such as Google Data Studio. By providing memory allocation for a BI Engine reservation, you can make dashboards that rely on a BigQuery backend much more responsive.
Joining two tables requires data coordination and is subject to limitations imposed by the communication bandwidth between slots. If it is possible to avoid a join, or to reduce the amount of data being joined, do so.
One way to improve the read performance and avoid joins is to give up on storing data efficiently and instead add redundant copies of data. This is called denormalization. Thus, instead of storing the bicycle station latitudes and longitudes separately from the cycle hire information, we could create a denormalized table:
CREATE OR REPLACE TABLE ch07eu.london_bicycles_denorm AS SELECT start_station_id , s.latitude AS start_latitude , s.longitude AS start_longitude , end_station_id , e.latitude AS end_latitude , e.longitude AS end_longitude FROM `bigquery-public-data`.london_bicycles.cycle_hire as h JOIN `bigquery-public-data`.london_bicycles.cycle_stations as s ON h.start_station_id = s.id JOIN `bigquery-public-data`.london_bicycles.cycle_stations as e ON h.end_station_id = e.id
Then all subsequent queries will not need to carry out the join because the table will contain the necessary location information for all trips:
Row | start_station_id | start_latitude | start_longitude | end_station_id | end_latitude | end_longitude |
---|---|---|---|---|---|---|
1 | 439 | 51.5338 | -0.118677 | 680 | 51.47768469 | -0.170329317 |
2 | 597 | 51.473471 | -0.20782 | 622 | 51.50748124 | -0.205535908 |
3 | 187 | 51.49247977 | -0.178433004 | 187 | 51.49247977 | -0.178433004 |
4 | 15 | 51.51772703 | -0.127854211 | 358 | 51.516226 | -0.124826 |
5 | 638 | 51.46663393 | -0.169821175 | 151 | 51.51213691 | -0.201554966 |
In this case, you are trading off storage and reading more data for the computational expense of a join. It is quite possible that the cost of reading more data from disk will outweigh the cost of the join—you should measure whether denormalization brings performance benefits.
Self-joins happen when a table is joined with itself. Although BigQuery supports self-joins, they can lead to performance degradation if the table being joined with itself is very large. In many cases, you can avoid the self-join by taking advantage of SQL features such as aggregation and window functions.
Let’s look at an example. One of the BigQuery public datasets is the dataset of baby names published by the US Social Security Administration. It is possible to query the dataset to find the most common male names for the year 2015 in the state of Massachusetts:
SELECT name , number AS num_babies FROM `bigquery-public-data`.usa_names.usa_1910_current WHERE gender = 'M' AND year = 2015 AND state = 'MA' ORDER BY num_babies DESC LIMIT 5
Here’s the result of that query:
Row | name | num_babies |
---|---|---|
1 | Benjamin | 456 |
2 | William | 445 |
3 | Noah | 403 |
4 | Mason | 365 |
5 | James | 354 |
Similarly, the most common female names (gender = 'F'
) for the year 2015 in Massachusetts were as follows:
Row | name | num_babies |
---|---|---|
1 | Olivia | 430 |
2 | Emma | 402 |
3 | Sophia | 373 |
4 | Isabella | 350 |
5 | Charlotte | 344 |
What are the most common names assigned to both male and female babies in the country over all the years in the dataset? A naive way to solve this problem involves reading the input table twice and doing a self-join:
WITH male_babies AS ( SELECT name , number AS num_babies FROM `bigquery-public-data`.usa_names.usa_1910_current WHERE gender = 'M' ), female_babies AS ( SELECT name , number AS num_babies FROM `bigquery-public-data`.usa_names.usa_1910_current WHERE gender = 'F' ), both_genders AS ( SELECT name , SUM(m.num_babies) + SUM(f.num_babies) AS num_babies , SUM(m.num_babies) / (SUM(m.num_babies) + SUM(f.num_babies)) AS frac_male FROM male_babies AS m JOIN female_babies AS f USING (name) GROUP BY name ) SELECT * FROM both_genders WHERE frac_male BETWEEN 0.3 and 0.7 ORDER BY num_babies DESC LIMIT 5
This took 74 seconds and yielded the following:
Row | name | num_babies | frac_male |
---|---|---|---|
1 | Jordan | 982149616 | 0.6705115608373867 |
2 | Willie | 940460442 | 0.5722103705452823 |
3 | Lee | 820214744 | 0.689061146650151 |
4 | Jessie | 759150003 | 0.5139710590240227 |
5 | Marion | 592706454 | 0.32969114589732473 |
To add insult to injury, the answer is also wrong: as much as we like the name Jordan, the entire US population is only around 330 million, so there cannot have been 982 million babies with that name. The self-join unfortunately joins across state and year boundaries.12
A faster, more elegant (and correct!) solution is to recast the query to read the input only once and avoid the self-join completely. This took only 2.4 seconds—a 30-times increase in speed:
WITH all_babies AS ( SELECT name , SUM(IF(gender = 'M', number, 0)) AS male_babies , SUM(IF(gender = 'F', number, 0)) AS female_babies FROM `bigquery-public-data.usa_names.usa_1910_current` GROUP BY name ), both_genders AS ( SELECT name , (male_babies + female_babies) AS num_babies , SAFE_DIVIDE(male_babies, male_babies + female_babies) AS frac_male FROM all_babies WHERE male_babies > 0 AND female_babies > 0 ) SELECT * FROM both_genders WHERE frac_male BETWEEN 0.3 and 0.7 ORDER BY num_babies desc limit 5
Here’s the result, in case you’re curious:
Row | name | num_babies | frac_male |
---|---|---|---|
1 | Jessie | 229263 | 0.4327213723976394 |
2 | Riley | 187762 | 0.46760792918694943 |
3 | Casey | 181176 | 0.5916456925862145 |
4 | Jackie | 161428 | 0.4624042916966078 |
5 | Johnnie | 136208 | 0.6842549629977681 |
It is possible to carry out the previous query with an efficient join as long as we reduce the amount of data being joined by grouping the data by name and gender early on:
with all_names AS ( SELECT name, gender, SUM(number) AS num_babies FROM `bigquery-public-data`.usa_names.usa_1910_current GROUP BY name, gender ), male_names AS ( SELECT name, num_babies FROM all_names WHERE gender = 'M' ), female_names AS ( SELECT name, num_babies FROM all_names WHERE gender = 'F' ), ratio AS ( SELECT name , (f.num_babies + m.num_babies) AS num_babies , m.num_babies / (f.num_babies + m.num_babies) AS frac_male FROM male_names AS m JOIN female_names AS f USING (name) ) SELECT * from ratio WHERE frac_male BETWEEN 0.3 and 0.7 ORDER BY num_babies DESC LIMIT 5
The early grouping serves to trim the data early in the query, before the query performs a JOIN
. That way, shuffling and other complex operations execute only on the much smaller data and remain quite efficient. This query finished in two seconds and returned the correct result.
Suppose that you want to find the duration between a bike being dropped off and it being rented again; in other words, the duration that a bicycle stays at the station. This is an example of a dependent relationship between rows. It might appear that the only way to solve this is to join the table with itself, matching the end_date
of one trip against the start_date
of the next.
You can, however, avoid a self-join by using a window function (we cover window functions in Chapter 8):
SELECT bike_id , start_date , end_date , TIMESTAMP_DIFF( start_date, LAG(end_date) OVER (PARTITION BY bike_id ORDER BY start_date), SECOND) AS time_at_station FROM `bigquery-public-data`.london_bicycles.cycle_hire LIMIT 5
Here’s the result of that query:
Row | bike_id | start_date | end_date | time_at_station |
---|---|---|---|---|
1 | 2 | 2015-01-05 15:59:00 UTC | 2015-01-05 16:17:00 UTC | null |
2 | 2 | 2015-01-07 01:31:00 UTC | 2015-01-07 01:50:00 UTC | 119640 |
3 | 2 | 2015-01-21 07:56:00 UTC | 2015-01-21 08:12:00 UTC | 1231560 |
4 | 2 | 2015-01-21 16:15:00 UTC | 2015-01-21 16:31:00 UTC | 28980 |
5 | 2 | 2015-01-21 16:57:00 UTC | 2015-01-21 17:23:00 UTC | 1560 |
Notice that the first row has a null for time_at_station
because we don’t have a timestamp for the previous dropoff. After that, the time_at_station
tracks the difference between the previous dropoff and the current pickup.
Using this, we can compute the average time that a bicycle is unused at each station and rank stations by that measure:
WITH unused AS ( SELECT bike_id , start_station_name , start_date , end_date , TIMESTAMP_DIFF(start_date, LAG(end_date) OVER (PARTITION BY bike_id ORDER BY start_date), SECOND) AS time_at_station FROM `bigquery-public-data`.london_bicycles.cycle_hire ) SELECT start_station_name , AVG(time_at_station) AS unused_seconds FROM unused GROUP BY start_station_name ORDER BY unused_seconds ASC LIMIT 5
From this query, we learn that bicycles turn over the fastest at the following stations:
Row | start_station_name | unused_seconds |
---|---|---|
1 | LSP1 | 1500.0 |
2 | Wormwood Street, Liverpool Street | 4605.427372968633 |
3 | Hyde Park Corner, Hyde Park | 5369.884926322234 |
4 | Speakers’ Corner 1, Hyde Park | 6203.571977906734 |
5 | Albert Gate, Hyde Park | 6258.720194303267 |
Sometimes it can be helpful to precompute functions on smaller tables and then join with the precomputed values rather than repeat an expensive calculation each time.
For example, suppose that you want to find the pair of stations between which our customers ride bicycles at the fastest pace. To compute the pace13 (minutes per kilometer) at which they ride, we need to divide the duration of the ride by the distance between stations.
We could create a denormalized table with distances between stations and then compute the average pace:
with denormalized_table AS ( SELECT start_station_name , end_station_name , ST_DISTANCE(ST_GeogPoint(s1.longitude, s1.latitude), ST_GeogPoint(s2.longitude, s2.latitude)) AS distance , duration FROM `bigquery-public-data`.london_bicycles.cycle_hire AS h JOIN `bigquery-public-data`.london_bicycles.cycle_stations AS s1 ON h.start_station_id = s1.id JOIN `bigquery-public-data`.london_bicycles.cycle_stations AS s2 ON h.end_station_id = s2.id ), durations AS ( SELECT start_station_name , end_station_name , MIN(distance) AS distance , AVG(duration) AS duration , COUNT(*) AS num_rides FROM denormalized_table WHERE duration > 0 AND distance > 0 GROUP BY start_station_name, end_station_name HAVING num_rides > 100 ) SELECT start_station_name , end_station_name , distance , duration , duration/distance AS pace FROM durations ORDER BY pace ASC LIMIT 5
This query invokes the geospatial function ST_DISTANCE
once for each row in the cycle_hire
table (24 million times), takes 16.1 seconds, and processes 1.86 GB.
Alternatively, we can use the cycle_stations
table to precompute the distance between every pair of stations (this is a self-join) and then join it with the reduced-size table of average duration between stations:
with distances AS ( SELECT a.id AS start_station_id , a.name AS start_station_name , b.id AS end_station_id , b.name AS end_station_name , ST_DISTANCE(ST_GeogPoint(a.longitude, a.latitude), ST_GeogPoint(b.longitude, b.latitude)) AS distance FROM `bigquery-public-data`.london_bicycles.cycle_stations a CROSS JOIN `bigquery-public-data`.london_bicycles.cycle_stations b WHERE a.id != b.id ), durations AS ( SELECT start_station_id , end_station_id , AVG(duration) AS duration , COUNT(*) AS num_rides FROM `bigquery-public-data`.london_bicycles.cycle_hire WHERE duration > 0 GROUP BY start_station_id, end_station_id HAVING num_rides > 100 ) SELECT start_station_name , end_station_name , distance , duration , duration/distance AS pace FROM distances JOIN durations USING (start_station_id, end_station_id) ORDER BY pace ASC LIMIT 5
The recast query with the more efficient joins takes only 5.4 seconds, an increase in speed of three times, and processes 554 MB, a reduction in cost of nearly four times.
What if we were to store the distance traveled in each trip in a denormalized table?
CREATE OR REPLACE TABLE ch07eu.cycle_hire AS SELECT start_station_name , end_station_name , ST_DISTANCE(ST_GeogPoint(s1.longitude, s1.latitude), ST_GeogPoint(s2.longitude, s2.latitude)) AS distance , duration FROM `bigquery-public-data`.london_bicycles.cycle_hire AS h JOIN `bigquery-public-data`.london_bicycles.cycle_stations AS s1 ON h.start_station_id = s1.id JOIN `bigquery-public-data`.london_bicycles.cycle_stations AS s2 ON h.end_station_id = s2.id
Querying this table returns results in 8.7 seconds and processes 1.6 GB—in other words, it’s 60% slower and about three times more expensivethan the previous query. In this instance, therefore, joining with a smaller table turns out to be more efficient than querying a larger, denormalized table. However, this is the sort of thing that you need to measure for your particular use case. You will see later how you can efficiently store data at differing levels of granularity in a single denormalized table with nested and repeated fields.
Some operations (e.g., ordering) need to be carried out on a single worker. Having to sort too much data can overwhelm a worker’s memory and result in a “resources exceeded” error. Avoid overwhelming the worker with too much data. As the hardware in Google datacenters is upgraded, what “too much” means in this context expands over time. Currently, this is on the order of one gigabyte.
Suppose that you want to go through the bike rentals and number them 1, 2, 3, and so on, in the order that the rentals ended. We could do that by using the ROW_NUMBER()
function (we cover window functions in Chapter 8):
SELECT rental_id , ROW_NUMBER() OVER(ORDER BY end_date) AS rental_number FROM `bigquery-public-data`.london_bicycles.cycle_hire ORDER BY rental_number ASC LIMIT 5
Here’s the result:
Row | rental_id | rental_number |
---|---|---|
1 | 40346512 | 1 |
2 | 40346508 | 2 |
3 | 40346519 | 3 |
4 | 40346510 | 4 |
5 | 40346520 | 5 |
However, this query takes 29.9 seconds to process just 372 MB because it needs to sort the entirety of the london_bicycles
dataset on a single worker. Had we processed a larger dataset, it would have overwhelmed that worker.
In such cases, we might want to consider whether it is possible to limit the large sorts and distribute them. Indeed, it is possible to extract the date from the rentals and then sort trips within each day:
WITH rentals_on_day AS ( SELECT rental_id , end_date , EXTRACT(DATE FROM end_date) AS rental_date FROM `bigquery-public-data.london_bicycles.cycle_hire` ) SELECT rental_id , rental_date , ROW_NUMBER() OVER(PARTITION BY rental_date ORDER BY end_date) AS rental_number_on_day FROM rentals_on_day ORDER BY rental_date ASC, rental_number_on_day ASC LIMIT 5
This takes 8.9 seconds (an increase in speed of three times) because the sorting can be done on just a single day of data at a time. It yields the rental number on a day-by-day basis:
Row | rental_id | rental_date | rental_number_on_day |
---|---|---|---|
1 | 40346512 | 2015-01-04 | 1 |
2 | 40346508 | 2015-01-04 | 2 |
3 | 40346519 | 2015-01-04 | 3 |
4 | 40346510 | 2015-01-04 | 4 |
5 | 40346520 | 2015-01-04 | 5 |
The same problem of overwhelming a worker (in this case, overwhelming the memory of the worker) can happen during an ARRAY_AGG
with GROUP BY
if one of the keys is much more common than the others.14
Because there are more than three million GitHub repositories and the commits are well distributed among them, this query succeeds:
SELECT repo_name , ARRAY_AGG(STRUCT(author, committer, subject, message, trailer, difference, encoding) ORDER BY author.date.seconds) FROM `bigquery-public-data.github_repos.commits`, UNNEST(repo_name) AS repo_name GROUP BY repo_name
However, most of the people using GitHub live in only a few time zones, so grouping by the time zone fails—we are asking a single worker to sort a significant fraction of 750 GB:
SELECT author.tz_offset, ARRAY_AGG(STRUCT(author, committer, subject, message, trailer, difference, encoding) ORDER BY author.date.seconds) FROM `bigquery-public-data.github_repos.commits` GROUP BY author.tz_offset
One solution is to add a LIMIT
to the ORDER BY
:
SELECT author.tz_offset, ARRAY_AGG(STRUCT(author, committer, subject, message, trailer, difference, encoding) ORDER BY author.date.seconds LIMIT 1000) FROM `bigquery-public-data.github_repos.commits` GROUP BY author.tz_offset
If you do require sorting all of the data, use more granular keys (i.e., distribute the group’s data over more workers) and then aggregate the results corresponding to the desired key. For example, instead of grouping only by the time zone, it is possible to group by time zone and repo_name
and then aggregate across repositories to get the actual answer for each time zone:
SELECT repo_name, author.tz_offset , ARRAY_AGG(STRUCT(author, committer, subject, message, trailer, difference, encoding) ORDER BY author.date.seconds) FROM `bigquery-public-data.github_repos.commits`, UNNEST(repo_name) AS repo_name GROUP BY repo_name, author.tz_offset
Invoking a JavaScript user-defined function (UDF) requires a V8 subprocess to be launched, and this degrades performance. JavaScript UDFs are computationally expensive and have access to limited memory, so reducing the amount of data processed by the UDF can help improve performance.
Although BigQuery supports UDFs in JavaScript, opt to write your UDFs using SQL wherever possible; SQL is distributed and optimized by BigQuery natively. If you are writing a UDF in SQL, there is no performance difference between embedding the SQL function directly in the query or using temporary and permanent functions. The reason to use an SQL UDF is for reusability, composability, and readability.
BigQuery provides fast, low-memory approximations of aggregate functions. Instead of using COUNT(DISTINCT …)
, we can use APPROX_COUNT_DISTINCT
on large data streams when a small statistical uncertainty in the result is tolerable.
For example, you can find the number of unique GitHub repositories by using:
SELECT COUNT(DISTINCT repo_name) AS num_repos FROM `bigquery-public-data`.github_repos.commits, UNNEST(repo_name) AS repo_name
This query takes 7.1 seconds to compute the correct result of 3,348,576. On the other hand, the following query takes 3.2 seconds (an increase in speed of two times) and returns an approximate result of 3,400,927, which overestimates the correct answer by 1.5%:
SELECT APPROX_COUNT_DISTINCT(repo_name) AS num_repos FROM `bigquery-public-data`.github_repos.commits, UNNEST(repo_name) AS repo_name
On smaller datasets, however, there might be no advantage. Let’s look at an example that finds the total number of unique bicycles in the london_bicycles
dataset:
SELECT COUNT(DISTINCT bike_id) AS num_bikes FROM `bigquery-public-data`.london_bicycles.cycle_hire
This takes 0.9 seconds and returns the correct result of 13,705. Using the approximate counterpart takes 1.6 seconds (which is slower than the exact query) and returns an approximate result of 13,699:
SELECT APPROX_COUNT_DISTINCT(bike_id) AS num_bikes FROM `bigquery-public-data`.london_bicycles.cycle_hire
The approximate algorithm is much more efficient than the exact algorithm only on large datasets and is recommended in use cases for which errors of approximately 1% are tolerable. Before using the approximate function, always measure on your use case!
Other available approximate functions include APPROX_QUANTILES
to compute percentiles, APPROX_TOP_COUNT
to find the top elements, and APPROX_TOP_SUM
to compute top elements based on the sum of an element.
Here’s an example of using APPROX_TOP_COUNT
to find the five most frequently rented bicycles:
SELECT APPROX_TOP_COUNT(bike_id, 5) AS num_bikes FROM `bigquery-public-data`.london_bicycles.cycle_hire
This yields the following:
Row | num_bikes.value | num_bikes.count |
---|---|---|
1 | 12925 | 2922 |
12841 | 2489 | |
13071 | 2474 | |
12926 | 2467 | |
12991 | 2444 |
Note that the result is a single row and consists of an array of values so that ordering is preserved.
If your queries are taking too long, you can use APPROX_TOP_COUNT
to check whether data skew is the reason. If so, consider the tips earlier in the chapter on dealing with data skew by carrying out the operation at a more granular level or using LIMIT
to reduce the data being processed.
To find the top five stations based on duration of bicycle rentals, you can use APPROX_TOP_SUM
:
SELECT APPROX_TOP_SUM(start_station_name, duration, 5) AS num_bikes FROM `bigquery-public-data`.london_bicycles.cycle_hire WHERE duration > 0
Here is the result of that query:
Row | num_bikes.value | num_bikes.sum |
---|---|---|
1 | Hyde Park Corner, Hyde Park | 600037440 |
Black Lion Gate, Kensington Gardens | 581085720 | |
Albert Gate, Hyde Park | 367235700 | |
Speakers’ Corner 1, Hyde Park | 318485820 | |
Speakers’ Corner 2, Hyde Park | 268442640 |
In addition to the just-described APPROX_*
functions (which carry out the entire approximate aggregation algorithm), BigQuery also supports the HyperLogLog++ (HLL++) algorithm, which allows you to break down the count-distinct problem into three separate operations:
Initialize a set, called an HLL sketch, by adding new elements to it by using HLL_COUNT.INIT
Find the cardinality (count) of an HLL sketch by using HLL_COUNT.EXTRACT
Merge two HLL sketches into a single sketch by using HLL_COUNT.MERGE_PARTIAL
In addition, HLL_COUNT.MERGE
combines steps 2 and 3, computing the count from a set of HLL sketches.
For example, here’s a query that finds the count of distinct stations in the London bicycles table regardless of whether a trip started or ended at the station:
WITH sketch AS ( SELECT HLL_COUNT.INIT(start_station_name) AS hll_start , HLL_COUNT.INIT(end_station_name) AS hll_end FROM `bigquery-public-data`.london_bicycles.cycle_hire ) SELECT HLL_COUNT.MERGE(hll_start) AS distinct_start , HLL_COUNT.MERGE(hll_end) AS distinct_end , HLL_COUNT.MERGE(hll_both) AS distinct_station FROM sketch, UNNEST([hll_start, hll_end]) AS hll_both
This returns the following:
Row | distinct_start | distinct_end | distinct_station |
---|---|---|---|
1 | 880 | 882 | 882 |
Of course, you also can achieve this by using APPROX_COUNT_DISTINCT
directly:
SELECT APPROX_COUNT_DISTINCT(start_station_name) AS distinct_start , APPROX_COUNT_DISTINCT(end_station_name) AS distinct_end , APPROX_COUNT_DISTINCT(both_stations) AS distinct_station FROM `bigquery-public-data`.london_bicycles.cycle_hire , UNNEST([start_station_name, end_station_name]) AS both_stations
This yields the same result and is much simpler to read and understand. Mostly, therefore, you would use the APPROX_
variants.
One reason to use the HLL functions might be that you need to employ manual aggregation or prevent storage of specific columns. Suppose that your data has the schema user_id, date, product, country
, and you need to compute the number of distinct users. However, the column user_id
is personally identifying, and so you would prefer to not store it indefinitely. In such a case, you can compute a manual aggregation using HLL_COUNT.INIT
, as follows:
INSERT INTO approx_distinct_users_agg AS SELECT date, product, country, HLL_COUNT.INIT(user_id) AS sketch GROUP BY date, product, country, sketch
Now you don’t need to store user_id
; you only need to store the sketch. Whenever you need to compute any higher-level aggregation, you can do the following:
SELECT date, HLL_COUNT.MERGE(sketch) FROM approx_distinct_users_agg GROUP BY date
In the previous section, we discussed how to improve query performance, but we limited ourselves to methods that do not change the layout of the table, where it is stored, or how it is accessed. In this section, we look at how addressing these factors can have a dramatic impact on query performance. Obviously, you will want to keep these tips in mind as you design your tables, because changing the schema of a table tends to break existing queries.
BigQuery is a regional service that is globally accessible. If you are querying a dataset that resides in the EU region, for example, the query will run on computational resources that are located in an EU datacenter. If you store the results of the query into a destination table, that table must be in a dataset that is also in the EU. You can, however, invoke the BigQuery REST API (i.e., invoke the query) from anywhere in the world, even from machines outside of GCP.
When working with other GCP resources such as Google Cloud Storage or Cloud Pub/Sub, the best performance will be obtained if you also locate them in the same region as the dataset. Thus, for example, if you are invoking a query from a Compute Engine instance or a Cloud Dataproc cluster, network overhead will be minimized if the instance or cluster is also located in the same region as the dataset being queried.
If you’re invoking BigQuery from outside GCP, consider the network topology and try to minimize the number of hops between the client machine and the GCP datacenter in which the dataset resides.
When invoking the REST API directly, you can minimize network overhead by accepting compressed, partial responses. To accept compressed responses, you can specify in the HTTP header that you will accept gzip and make sure that the string “gzip” appears in the name of the user-agent—for example:
Accept-Encoding: gzip User-Agent: programName (gzip)
Then the responses are compressed using gzip.
By default, responses from BigQuery contain all of the fields promised in the documentation. However, if we know what part of the response we are interested in, we can ask BigQuery to send back only that bit of the response, thus lowering the network overhead. For example, earlier in this chapter, we looked at how to get back the complete job details using the Jobs API. If you are interested in only a subset of the full response (for example, only the steps in the query plan), you can specify the field(s) of interest to limit the size of the response:15
JOBSURL="https://www.googleapis.com/bigquery/v2/projects/$PROJECT/jobs" FIELDS="statistics(query(queryPlan(steps)))" curl --silent -H "Authorization: Bearer $access_token" -H "Accept-Encoding: gzip" -H "User-Agent: get_job_details (gzip)" -X GET "${JOBSURL}/${JOBID}?fields=${FIELDS}" | zcat
Note that we are also specifying that we accept gzip encoding.
When using the REST API, it is possible to batch multiple BigQuery API calls by using the multipart/mixed
content type and nesting HTTP requests in each of the parts. The body of each part specifies the HTTP operation (GET, PUT, etc.), path portion of the URL, headers, and body. The server’s response is a single HTTP response with a multipart/mixed
content type, with the parts being responses (in order) to the requests that form the batched request. Even though the responses are in order, the server might execute the calls in any order. Thus you should treat the batched request as a parallel execution.
Here’s an example of sending a batched request to get some query plan details of the last five queries in our project. You first use the BigQuery command-line tool to get the five most recent successful jobs:16
# The 5 most recent successful jobs JOBS=$(bq ls -j -n 50 | grep SUCCESS | head -5 | awk '{print $1}')
The request goes to the batch endpoint for BigQuery:
BATCHURL="https://www.googleapis.com/batch/bigquery/v2" JOBSPATH="/projects/$PROJECT/jobs" FIELDS="statistics(query(queryPlan(steps)))"
Using the URL path, you can form the individual requests:
request="" for JOBID in $JOBS; do read -d '' part << EOF --batch_part_starts_here GET ${JOBSPATH}/${JOBID}?fields=${FIELDS} EOF request=$(echo "$request"; echo "$part") done
Then you can send the request to the batch endpoint as a multipart request:
curl --silent -H "Authorization: Bearer $access_token" -H "Content-Type: multipart/mixed; boundary=batch_part_starts_here" -X POST -d "$request" "${BATCHURL}"
In Chapter 5, we discussed using the BigQuery REST API and associated client libraries to list table data and get query results. The REST API provides the data in record-oriented, paginated views that are more conducive to relatively small result sets. Yet, with the advent of machine learning and distributed Extract, Transform, and Load (ETL) tools, external tools now require fast, efficient bulk access to BigQuery’s managed storage. Such bulk read access is provided by the BigQuery Storage API via Remote Procedure Call (RPC)–based protocol. With the BigQuery Storage API, structured data is sent over the wire in a binary serialization format that maps more closely to the columnar format in which the data is stored. This allows for additional parallelism among multiple consumers for a set of results.
It is unlikely that you will use the BigQuery Storage API directly17 if you are an end user. Instead, you will take advantage of tools such as Cloud Dataflow, Cloud Dataproc, TensorFlow, AutoML, and others that employ the Storage API to read directly from BigQuery’s managed storage instead of going through the BigQuery API. Because the Storage API directly accesses stored data, permission to access the BigQuery Storage API is distinct from the existing BigQuery API. The BigQuery Storage API must be enabled independently of enabling BigQuery.
The BigQuery Storage API provides several benefits to tools that read directly from BigQuery’s managed storage. For example, consumers can read disjoint a set of rows from a table using multiple streams (enabling distributed reads from different workers in Cloud Dataproc, for example), dynamically shard these streams (thus reducing tail latency, which can be a significant problem for MapReduce jobs), select a subset of columns to read (enabling machine learning frameworks to read only the features used by the model), filter column values (reducing the data transmitted over the network), and still ensure snapshot consistency (i.e., read data as of a specific point in time).
In Chapter 5, we looked at using the Jupyter Magics %%bigquery
to load the result of queries into pandas DataFrames. However, those datasets were relatively small—on the order of a dozen to a few hundred rows. What if you want to load the entire london_bicycles
dataset (24 million rows) into a pandas DataFrame? In that case, it is possible to specify an option on the Magics to load the data into the pandas DataFrame using the Storage API rather than the BigQuery API. You will need to first install the Storage API Python client library with Avro and pandas support. You can do this within Jupyter by using the following:
%pip install google-cloud-bigquery-storage[fastavro,pandas]
Then use the %%bigquery
Magics as before, but add the Storage API option:
%%bigquery df --use_bqstorage_api --project $PROJECT SELECT start_station_name , end_station_name , start_date , duration FROM `bigquery-public-data`.london_bicycles.cycle_hire
Note that we are taking advantage of the ability of the Storage API to provide direct access to individual columns; it is not necessary to read the entire BigQuery table into the pandas DataFrame. If it so happens that the amount of data returned by the query is comfortably small, the Magics falls back automatically to the BigQuery API. Therefore, there is no harm in always using this flag in your notebook cells. To turn --use_bqstorage_api
on by default in all the Magics cells within a notebook session, you can set a context flag:
import google.cloud.bigquery.magics google.cloud.bigquery.magics.context.use_bqstorage_api = True
Query performance depends on where the table data is stored, and in what format. In general, the fastest performance is obtained if you store data in such a way that queries need to do very little seeking or type conversion.
Even though BigQuery supports querying directly from external sources such as Google Cloud Storage, Cloud Bigtable, and Google Sheets, the fastest query performance will be obtained if you use native tables.
We recommend that you use BigQuery as your analytics data warehouse for all your structured and semi-structured data. Use external data sources for staging (Google Cloud Storage), real-time ingest (Cloud Pub/Sub, Cloud Bigtable), or transactional updates (Cloud SQL, Cloud Spanner). Then, as described in Chapter 4, set up a periodic data pipeline to load the data from these external sources into BigQuery.
If your use case is such that you need to query the data from Google Cloud Storage, store it in a compressed, columnar format (e.g., Parquet) if you can. Use row-based formats such as JSON or comma-separated values (CSV) only as a last resort.
If you are loading data into BigQuery by staging it in Google Cloud Storage first, consider deleting the Google Cloud Storage data after the data is loaded. If you are performing ETL on the data to load it into BigQuery (so that the data in BigQuery is heavily transformed or is only a subset), you might want to retain the raw data in Google Cloud Storage. In such cases, reduce costs by creating life cycle rules on buckets, to downgrade the Google Cloud Storage storage class.
To enable life cycle management on a bucket so that data in multiregional or standard classes older than 30 days is moved to Nearline Storage, and Nearline data older than 90 days is moved to Coldline Storage:
gsutil lifecycle set lifecycle.yaml gs://some_bucket/
In this example, the file lifecycle.yaml contains this content:
{ "lifecycle": { "rule": [ { "action": { "type": "SetStorageClass", "storageClass": "NEARLINE" }, "condition": { "age": 30, "matchesStorageClass": ["MULTI_REGIONAL", "STANDARD"] } }, { "action": { "type": "SetStorageClass", "storageClass": "COLDLINE" }, "condition": { "age": 90, "matchesStorageClass": ["NEARLINE"] } } ]}}
You can use life cycle management not just to change an object’s class but also to delete objects older than a certain threshold.18
One of the BigQuery public datasets is a dataset of cyclonic storms (hurricanes, typhoons, cyclones, etc.) as observed and measured by various meteorological agencies around the world. Cyclonic storms can last up to a few weeks, and observations are carried out once every three hours or so. Suppose that you want to query this dataset to find all of the storms in 2018, the maximum wind speed attained by each storm over its lifetime, and the time and location of the storm when this maximum wind speed was reached. This query pulls out the necessary information from the public dataset:
SELECT sid, number, basin, name, ARRAY_AGG(STRUCT(iso_time, usa_latitude, usa_longitude, usa_wind) ORDER BY usa_wind DESC LIMIT 1)[OFFSET(0)].* FROM `bigquery-public-data`.noaa_hurricanes.hurricanes WHERE season = '2018' GROUP BY sid, number, basin, name ORDER BY number ASC
We are extracting the storm ID (sid
), the storm number within the season, the basin, and the name of the storm (if named) and then finding the array of observations made of this storm, ranking the observations in descending order of wind speed, and taking the highest wind speed observed for each storm. The storms themselves are ordered by number. The result consists of 88 rows, and looks something like this:
Row | sid | number | basin | name | iso_time | usa_latitude | usa_longitude | usa_wind |
---|---|---|---|---|---|---|---|---|
1 | 2018002N09123 | 1 | WP | BOLAVEN | 2018-01-02 18:00:00 UTC | 9.7 | 117.2 | 29 |
2 | 2018003S15053 | 2 | SI | AVA | 2018-01-05 06:00:00 UTC | -17.9 | 50.0 | 93 |
3 | 2018006S13092 | 3 | SI | IRVING | 2018-01-07 18:00:00 UTC | -15.8 | 83.0 | 89 |
4 | 2018010S18123 | 4 | SI | JOYCE | 2018-01-11 18:00:00 UTC | -18.7 | 121.6 | 54 |
The query processed 41.7 MB and took 1.4 seconds. The first row is of a storm named Bolaven that reached a maximum wind speed of 29 kph on January 2, 2018, at 18:00 UTC.
Because observations are carried out by multiple meteorological agencies, it is possible to standardize this data using nested fields and store the structs in BigQuery, as follows:19
CREATE OR REPLACE TABLE ch07.hurricanes_nested AS SELECT sid, season, number, basin, name, iso_time, nature, usa_sshs, STRUCT(usa_latitude AS latitude, usa_longitude AS longitude, usa_wind AS wind, usa_pressure AS pressure) AS usa, STRUCT(tokyo_latitude AS latitude, tokyo_longitude AS longitude, tokyo_wind AS wind, tokyo_pressure AS pressure) AS tokyo, ... AS cma, ... AS hko, ... AS newdelhi, ... AS reunion, ... bom, ... AS wellington, ... nadi FROM `bigquery-public-data`.noaa_hurricanes.hurricanes
Querying this table is similar to querying the original table, except that field names change a bit (usa.latitude
instead of usa_latitude
):
SELECT sid, number, basin, name, ARRAY_AGG(STRUCT(iso_time, usa.latitude, usa.longitude, usa.wind) ORDER BY usa.wind DESC LIMIT 1)[OFFSET(0)].* FROM ch07.hurricanes_nested WHERE season = '2018' GROUP BY sid, number, basin, name ORDER BY number ASC
The query processes the same amount of data and takes the same time as the original query on the public dataset. Using nested fields (i.e., structs) does not change the query speed or cost, although it might make your query more readable.
Because there are multiple observations of the same storm over its life cycle, we can change the storage to have a single row per storm and store an array of observations for each storm:
CREATE OR REPLACE TABLE ch07.hurricanes_nested_track AS SELECT sid, season, number, basin, name, ARRAY_AGG( STRUCT( iso_time, nature, usa_sshs, STRUCT(usa_latitude AS latitude, usa_longitude AS longitude, usa_wind AS wind, usa_pressure AS pressure) AS usa, STRUCT(tokyo_latitude AS latitude, tokyo_longitude AS longitude, tokyo_wind AS wind, tokyo_pressure AS pressure) AS tokyo, ... AS cma, ... AS hko, ... AS newdelhi, ... AS reunion, ... bom, ... AS wellington, ... nadi ) ORDER BY iso_time ASC ) AS obs FROM `bigquery-public-data`.noaa_hurricanes.hurricanes GROUP BY sid, season, number, basin, name
Notice that we are now storing sid
, season
, and so on as scalar columns because they do not change over the lifetime of the storm. The remaining data that changes for each observation is stored as an array of structs. Here’s how querying this table now looks:20
SELECT number, name, basin, (SELECT AS STRUCT iso_time, usa.latitude, usa.longitude, usa.wind FROM UNNEST(obs) ORDER BY usa.wind DESC LIMIT 1).* FROM ch07.hurricanes_nested_track WHERE season = '2018' ORDER BY number ASC
The result is the same, but this time the query processes only 14.7 MB (a reduction in cost of three times) and finishes in one second (a 30% improvement in speed). Why does this improvement in performance happen? When we store the data as an array, the number of rows in the table reduces dramatically (from 682,000 to 14,000),21 because there is now only one row per storm instead of one row per observation time. Then when we filter the rows by checking for season, BigQuery is able to discard many related observations simultaneously, as shown in Figure 7-13.
Another benefit is that we no longer need to duplicate rows of data when we have unequal levels of granularity within the same table. You can store both the granular-level individual hurricane latitude and longitude data as well as the high-level hurricane ID, name, and season data in the same table. And because BigQuery stores table data as highly compressed individual columns, you can query and process the high-level data without the cost of operating over the rows of granular data—it is now stored as an array of values per hurricane.
For example, if you simply want to query the number of storms by year, you could query just the columns you want from the details:
WITH hurricane_detail AS ( SELECT sid, season, number, basin, name, ARRAY_AGG( STRUCT( iso_time, nature, usa_sshs, STRUCT(usa_latitude AS latitude, usa_longitude AS longitude, usa_wind AS wind, usa_pressure AS pressure) AS usa, STRUCT(tokyo_latitude AS latitude, tokyo_longitude AS longitude, tokyo_wind AS wind, tokyo_pressure AS pressure) AS tokyo ) ORDER BY iso_time ASC ) AS obs FROM `bigquery-public-data`.noaa_hurricanes.hurricanes GROUP BY sid, season, number, basin, name ) SELECT COUNT(sid) AS count_of_storms, season FROM hurricane_detail GROUP BY season ORDER BY season DESC
The preceding query processes 27 MB instead of the 56 MB it would have had to process had we not used nested, repeated fields.
Nested fields by themselves do not improve performance, although they can improve readability by essentially prejoining other related tables into a single location. Nested, repeated fields, on the other hand, are extremely advantageous from a performance standpoint. Consider using nested, repeated fields in your schema, because they have the potential to provide you a significant boost in speed and lower query costs whenever you have queries that filter on a column that is not nested or repeated (season
, in our case).
A key drawback to nested, repeated fields is that you cannot easily stream to this table if the streaming updates involve adding elements to existing arrays—it is no longer as simple as appending a row to the table: you now need to mutate the existing row. Of course, because the hurricane data is updated with new observations, this drawback would be quite significant, and it explains why the public dataset of hurricanes does not use nested, repeated fields.
On the other hand, the public dataset of GitHub commits (bigquery-public-data.github_repos.commits
) uses a nested, repeated field (repo_name
) to store the list of repositories affected by a commit. This doesn’t change over time. So using a nested, repeated field for repo_name
provides a speedup to queries that filter on any other field.
Within the BigQuery public dataset of utility data is a table of polygon boundaries of US zip codes (available in bigquery-public-data.utility_us.zipcode_area
) and another table of polygon boundaries of US cities (bigquery-public-data.utility_us.us_cities_area
). The US zip code geometry column (zipcode_geom
) is a string,24 whereas the city geometry column (city_geom
) is a geography type.
From these two tables, it is possible to get a list of all the zip codes for Santa Fe, New Mexico:25
SELECT name, zipcode FROM `bigquery-public-data`.utility_us.zipcode_area JOIN `bigquery-public-data`.utility_us.us_cities_area ON ST_INTERSECTS(ST_GeogFromText(zipcode_geom), city_geom) WHERE name LIKE '%Santa Fe%'
The query took 51.9 seconds and processed 305.5 MB to produce the following:
Row | name | zipcode |
---|---|---|
1 | Santa Fe, NM | 87505 |
2 | Santa Fe, NM | 87501 |
3 | Santa Fe, NM | 87507 |
4 | Eldorado at Santa Fe, NM | 87508 |
5 | Santa Fe, NM | 87508 |
6 | Santa Fe, NM | 87506 |
Why did the query take so long? This is not because ST_INTERSECTS
is expensive; it is mainly because the ST_GeogFromText
function needs to compute the S2 cells26 and build a GEOGRAPHY
type corresponding to each zip code.
We can change the zip code table to do this computation beforehand and store the geometry as a GEOGRAPHY
type:
CREATE OR REPLACE TABLE ch07.zipcode_area AS SELECT * REPLACE(ST_GeogFromText(zipcode_geom) AS zipcode_geom) FROM `bigquery-public-data`.utility_us.zipcode_area
SELECT * REPLACE
(see the previous snippet) is a convenient way to replace a column from a SELECT *
statement.
The new dataset is 131.8 MB, which is somewhat larger than the 116.5 MB of the original table. However, the trade-off is that queries against the table can take advantage of S2 coverings and become much faster. Thus, the following query finishes in 5.3 seconds (a speed increase of 10 times) and processes 320.8 MB (a slight increase in cost if you are using on-demand pricing):
SELECT name, zipcode FROM ch07.zipcode_area JOIN `bigquery-public-data`.utility_us.us_cities_area ON ST_INTERSECTS(zipcode_geom, city_geom) WHERE name LIKE '%Santa Fe%'
The performance advantages of storing geographic data as GEOGRAPHY
types instead of as strings or primitives are very compelling. This is why the utility_us
dataset is deprecated (it’s still publicly accessible so as to not break already written queries). We recommend that you use the table bigquery-public-data.geo_us_boundaries.us_zip_codes
, which uses GEOGRAPHY
types and is kept up to date.
Imagine that you frequently query the london_bicycles
dataset by year:
SELECT start_station_name , AVG(duration) AS avg_duration FROM `bigquery-public-data`.london_bicycles.cycle_hire WHERE EXTRACT(YEAR from start_date) = 2015 GROUP BY start_station_name ORDER BY avg_duration DESC LIMIT 5
This takes 2.8 seconds and processes 1 GB to return the stations responsible for the longest trips in 2015:
Row | start_station_name | avg_duration |
---|---|---|
1 | Mechanical Workshop Penton | 105420.0 |
2 | Contact Centre, Southbury House | 5303.75 |
3 | Stewart’s Road, Nine Elms | 4836.380090497735 |
4 | Black Lion Gate, Kensington Gardens | 4788.747908066496 |
5 | Speakers’ Corner 2, Hyde Park | 4610.192911183014 |
The query, however, must read through the entire table to find rows from 2015. In this section, we look at various ways to cut down on the size of the data being processed.
If filtering by year is very common, one way to cut down on the data being read is to store the data in multiple tables, with the name of each table suffixed by the year. This way, querying the data for 2015 does not require traversing rows corresponding to all the years but can instead just read the cycle_hire_2015
table.
Let’s go ahead and create such a table using the following:
CREATE OR REPLACE TABLE ch07eu.cycle_hire_2015 AS ( SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire WHERE EXTRACT(YEAR from start_date) = 2015 )
Now let’s create it with a year-sharded table. The query finishes in one second (a speed increase of three times) and needs to process only 345 MB (a cost saving of three times):
SELECT start_station_name , AVG(duration) AS avg_duration FROM ch07eu.cycle_hire_2015 GROUP BY start_station_name ORDER BY avg_duration DESC LIMIT 5
Use partitioned tables and template tables (covered next) instead of manually splitting your data across multiple tables.
It is possible to use wildcards and table suffixes to search for multiple years:
SELECT start_station_name , AVG(duration) AS avg_duration FROM `ch07eu.cycle_hire_*` WHERE _TABLE_SUFFIX BETWEEN '2015' AND '2016' GROUP BY start_station_name ORDER BY avg_duration DESC LIMIT 5
Partitioned tables allow you to store all of your related data in a single logical table but also efficiently query a subset of that data. If, for example, you store the last year’s worth of data but usually query only the last week, partitioning by time allows you to run queries that will need to scan only the last seven days’ worth of partitions. This can save orders of magnitude in query cost, slot utilization, and time.
The year-named tables discussed in the previous section are inefficient: BigQuery needs to maintain a copy of the schema and metadata for each of the sharded tables and verify permissions on each of the queried tables. Also, a single query cannot query more than 1,000 tables, and this might make querying the entire dataset difficult if your tables are sharded by date rather than by year (1,000 tables is not even three years of data if the tables are date-sharded). Furthermore, streaming into date-sharded tables can lead to the need for clock and time zone synchronization among multiple clients. The recommended best practice, therefore, is to use partitioned tables.
Partitioning and clustering are the most effective ways to reduce your query cost and improve performance. When in doubt, you should partition and cluster your tables; this enables a number of performance optimizations that are not available to unpartitioned or unclustered tables.
A partitioned table is a special table that is divided into partitions, with the partitions managed by BigQuery. We can create a partitioned version of the London cycle_hire
dataset using the following:
CREATE OR REPLACE TABLE ch07eu.cycle_hire_partitioned PARTITION BY DATE(start_date) AS SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire
You can keep storage costs in check by specifying an expiration time for a partition and asking BigQuery to ensure that users are always using a partition filter (and not querying the entire table by mistake):
CREATE OR REPLACE TABLE ch07eu.cycle_hire_partitioned PARTITION BY DATE(start_date) OPTIONS(partition_expiration_days=1000, require_partition_filter=true) AS SELECT * FROM `bigquery-public- data`.london_bicycles.cycle_hire
If you forget to set this option at the time of creating the table, you can always add it after the fact:
ALTER TABLE ch07eu.cycle_hire_partitioned SET OPTIONS(require_partition_filter=true)
Then, to find the stations with the longest average rentals in 2015, query the partitioned table, making sure to use the partition column (start_date
) in the filter clause:
SELECT start_station_name , AVG(duration) AS avg_duration FROM ch07eu.cycle_hire_partitioned WHERE start_date BETWEEN '2015-01-01' AND '2015-12-31' GROUP BY start_station_name ORDER BY avg_duration DESC LIMIT 5
The query takes one second and processes only 419.4 MB, a little more than the year-sharded table (because of the need to read the start_date
column), but it is still a saving over having to read the full dataset. Note, however, that there are disadvantages to formulating the query as follows:
SELECT start_station_name , AVG(duration) AS avg_duration FROM ch07eu.cycle_hire_partitioned WHERE EXTRACT(YEAR FROM start_date) = 2015 GROUP BY start_station_name ORDER BY avg_duration DESC LIMIT 5
This will end up processing 1 GB and will not yield any savings on the amount of data processed. To obtain the benefits of partitioning, the BigQuery runtime must be able to statically determine the partition filters.
It is possible to ask BigQuery to automatically partition the table based on ingestion time rather than a date/time column. To do so, use _PARTITIONTIME
or _PARTITIONDATE
as the partitioning column. These are pseudocolumns that refer to the ingestion time and do not actually exist in your dataset. You can, however, use these pseudocolumns in your queries to restrict the rows being scanned.
If you are streaming to an ingestion-time-partitioned table, data in the streaming buffer is held in the __UNPARTITIONED__
partition. To query data in the __UNPARTITIONED__
partition, look for NULL
values in the _PARTITIONTIME
pseudocolumn.
Clustering, like partitioning, is a way to instruct BigQuery to store data in a way that can allow less data to be read at query time. Whereas a partitioned table behaves similarly to a number of independent tables (one per partition), clustered tables are stored in a sorted format as a single table. This ordering allows unlimited unique values to be stored without any performance penalty, and it also means that when a filter is applied, BigQuery can skip opening any file that doesn’t contain the range of values being requested.
Clustering can be done on any primitive nonrepeated columns (INT64
, BOOL
, NUMERIC
, STRING
, DATE
, GEOGRAPHY
, and TIMESTAMP
). Usually, you’d cluster on columns that have a very high number of distinct values, like customerId
if you have millions of customers. If you have columns that aren’t as high in cardinality but are frequently used together, you can cluster by more than one column at a time. When you cluster by multiple columns, you can filter by any prefix of the clustering columns and realize the benefits of clustering.
If most of the queries on our bicycles dataset use start_station_name
and end_station_name
, we could optimize the storage to take advantage of this commonality in our queries by creating the table as follows:
CREATE OR REPLACE TABLE ch07eu.cycle_hire_clustered PARTITION BY DATE(start_date) CLUSTER BY start_station_name, end_station_name AS ( SELECT * FROM `bigquery-public-data`.london_bicycles.cycle_hire )
Then queries that use the clustering columns in order could experience a significant benefit—for example:
SELECT start_station_name , end_station_name , AVG(duration) AS duration FROM ch07eu.cycle_hire_clustered WHERE start_station_name LIKE '%Kennington%' AND end_station_name LIKE '%Hyde%' GROUP BY start_station_name, end_station_name
But in this case, the entire table is only 1.5 GB and fits into a single block, thus there is no improvement.
To see the benefits of clustering, we must use a larger table. Our colleague Felipe Hoffa has conveniently created a clustered table of 2.20 TB of Wikipedia views, where the clustering was as follows:
CLUSTER BY wiki, title
As long as we use wiki
(and, optionally, title
) in our queries, we will gain the benefits of clustering. For example, we can search English Wikipedia for the number of page views in June 2017 for articles whose titles contained the term “Liberia”:
SELECT title, SUM(views) AS views FROM `fh-bigquery.wikipedia_v3.pageviews_2017` WHERE DATE(datehour) BETWEEN '2017-06-01' AND '2017-06-30' AND wiki = 'en' AND title LIKE '%Liberia%' GROUP BY title
This query took 4.8 seconds elapsed and processed 38.6 GB. Had the table not been clustered (only partitioned),27 the query would have taken 25.9 seconds (five times slower) and processed 180.2 GB (five times costlier). On the other hand, a query that doesn’t first filter by wiki will not experience any benefits.
Partitioned tables are partitioned by date (whether it is a column or it is by ingestion time). If you need hour-level partitioning, one option is to use date-based partitioning and then cluster by hour along with whatever other attributes are appropriate.
So a common pattern is to cluster by the same column as you partition by. For example, if you partition by event_time
, a timestamp at which your log events occur, this will let you do very fast and efficient queries over arbitrary time periods that are smaller than the day boundary used in the partitioning. You could, for instance, query over only the last 10 minutes of data, and you wouldn’t need to scan anything older than that.
In a partitioned table, each partition consists of a single day of data, and BigQuery maintains the metadata necessary to ensure that queries, load jobs, and Data Definition Language (DDL)/DML statements all take advantage and maintain the integrity of the partitions.
In a clustered table, BigQuery sorts the data based on the values in the clustering columns and organizes them into storage blocks that are optimally sized for efficient scanning and discarding of unnecessary data. However, unlike with partitioning, BigQuery does not maintain the sorting of data within clusters as data is streamed to it. BigQuery will recluster the data periodically in order to maintain efficient data pruning and scan speed. You can see how efficiently clustered the table is by looking at the clustering_ratio
of the table (1.0 is completely optimal).
Updating a table using DML forces a recluster of the partition being updated. For example, assume that you periodically receive a table of corrections (which consist of cycle hires that were somehow missed in the previous updates). Using a MERGE
statement such as the following will cause a recluster of any partitions that are updated:
MERGE ch07eu.cycle_hire_clustered all_hires USING ch07eu.cycle_hire_corrections some_month ON all_hires.start_station_name = some_month.start_station_name WHEN MATCHED AND all_hires._PARTITIONTIME = DATE(some_month.start_date) THEN INSERT (rental_id, duration, ...) VALUES (rental_id, duration, ...)
If you don’t want to wait until BigQuery gets around to reclustering the table into which you have streamed updates, you can take advantage of the ability of DML statements to force a recluster. For example, you can apply a no-op UPDATE
to the partitions of interest (perhaps those written in the past 24 hours):
UPDATE ch07eu.cycle_hire_clustered SET start_station_id = 300 WHERE start_station_id = 300 AND start_date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
Table 7-1 summarizes the differences between partitioning and clustering and might help you choose between partitioning a table based on a column and clustering the table by that column.
Partitioning | Clustering | |
---|---|---|
Distinct values | Less than 10,000 | Unlimited |
Data management | Like a table (can expire, delete, etc.) | DML only |
Dry run cost | Precise | Upper bound |
Final cost | Precise | Block level (difficult to predict) |
Maintenance | None (exact partitioning happens immediately) | Background (impact might be delayed until background clustering occurs) |
Remember that we mentioned that running SELECT * ... LIMIT 10
in BigQuery is an antipattern because it ends up billing you for the full scan of the table. For clustered tables, this is not true. When you’re reading from a clustered table, BigQuery will pass along any optimizations that can be done to prevent reading data. So if you do SELECT * ... LIMIT 10
on a clustered table, the execution engine will be able to stop reading data as soon as 10 rows have been returned. Because of how the query engine employs a number of parallel workers, any of which could happen to finish first, the amount of data that is scanned is not deterministic. On the plus side, you will end up with queries that cost you much less on large tables.
A surprising side effect of the “early stop” cost reduction is that you can get performance benefits even if you don’t filter on the clustering columns—if you filter by columns that are correlated to your clustering columns, BigQuery might be able to read less data!
Suppose that you have a table with two columns: zip_code
and state
. You cluster based on zip_code
, which means that the data is sorted by zip_code
when it is stored on disk. In the United States, there is a correlation between state
and zip_code
because zip code ranges are assigned geographically (00000
in the Northeast and 99999
in the Northwest). If you run a query that filters by state
, even though you’re not filtering by the clustering column, BigQuery skips any data block that doesn’t have that state, and you’ll end up paying less for the query.
When a table is clustered, it allows BigQuery to apply a number of performance optimizations that are not possible with nonclustered tables. One of these optimizations is designed for star schemas, which let you filter based on constraints in a dimension table. For example, suppose that you have a fact table containing orders
, which is clustered by customer_id
, and a dimension table containing customers
, and you run the following query:
SELECT o.* FROM orders o JOIN customers c USING (customer_id) WHERE c.name = "Changying Bao"
Ordinarily, this query would need to scan the full orders
table in order to execute the join. But because the orders table is clustered by the column that is being used for the join, the righthand side of the join—querying customers and finding the matching customer IDs—is done first. Then the second part of the query just needs to look up the clustering column that matches the customer_id
. From a bytes-scanned perspective, it is as if BigQuery ran the following queries in parallel:
// First look up the customer id. // This scans only the small dimension table SET id = SELECT customer_id FROM customers WHERE c.name = "Changying Bao" // Next look up the customer from the orders table. // This will filter by the cluster column, // and so only needs to read a small amount of data. SELECT * FROM orders WHERE customer_id=$id ;
In short, clustering is highly recommended if you want to reduce query costs and improve performance.
BigQuery is designed to minimize the time taken to derive insights from data. Consequently, we can carry out ad hoc, interactive analytics on large datasets and stream in updates to those datasets in near real time. Sometimes, though, you might be less time sensitive. Perhaps all you need are nightly reports. In such cases, you might be willing to have your queries be queued up and executed when possible, and this might be sufficient for the reports to reflect data as of an hour ago.
You can submit a set of queries, called batch queries, to the service, and they will be queued on your behalf and started when idle resources are available. At the time they were introduced, in 2012, batch queries provided a pricing benefit. However, as of this writing, both interactive and batch queries cost the same. If you use flat-rate pricing, both batch queries and interactive queries share your allocated slots.
Because batch queries are not less expensive and use the same reservations and slots as interactive resources, the primary reason that you might want to employ batch queries is that they don’t count toward your concurrent rate limit and can make scheduling hundreds of queries easier.
There are a number of rate limits that affect interactive (i.e., nonbatch) queries. For example, you might be able to have at most 50 queries running concurrently,28 with concurrent byte limits and “large query” limits. If those limits are reached, the query will fail immediately. This is because BigQuery assumes that an interactive query is something the user needs to run immediately. When you use batch queries, on the other hand, if you ever reach a rate limit, the queries will be queued and retried later. There are still similar rate limits, but they operate separately from interactive rate limits, so your batch queries won’t affect your interactive ones.
One example might be that you have periodic queries that you run daily or hourly to build dashboards. Maybe you have 500 queries that you want to run. If you try to run them all at once as interactive, some will fail because of concurrent rate limits. Additionally, you don’t necessarily want these queries to interfere with other queries you are running manually from the BigQuery web UI. So you can run the dashboard queries at batch priority, and the other queries will run normally as interactive.
To use batch queries, provide the --batch flag
to the bq
command-line tool or specify the job priority in the console or REST API to be BATCH
, not INTERACTIVE
. If BigQuery hasn’t started the query within 24 hours, BigQuery changes the job priority to interactive. However, the typical wait time before a query starts is on the order of minutes, unless you are submitting more queries than your quota of concurrent requests. Then queries will be run after earlier ones complete.
If you care about minimizing the “time to insight” or wish to have the simplest possible data pipeline, we strongly encourage you to use streaming inserts into BigQuery via Cloud Pub/Sub and Cloud Dataflow. This architecture provides ingest speeds on the order of 100,000 rows per second, even on small Cloud Dataflow clusters. You can horizontally scale the Dataflow cluster by adding more machines and achieve several million rows per second of ingest performance without having to tweak any knobs.29
Streaming ingests incur charges, whereas load jobs are free. In some scenarios, you might be willing to trade off a few minutes’ latency for reduced ingestion costs. In that case, consider using file loads instead of streaming inserts. You can do that from Apache Beam’s BigQueryIO
using the following:
BigQueryIO.writeTableRows() .to("project-id:dataset-id.table-id") .withCreateDisposition( BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withMethod(Method.FILE_LOADS) .withTriggeringFrequency(Duration.standardSeconds(600)) .withNumFileShards(10) .withSchema(new TableSchema()...) .withoutValidation())
The previous code snippet writes to BigQuery every 10 minutes using file loads, thus avoiding the streaming ingestion charge. File loads can scale to 300,000 rows per second on medium-sized Dataflow clusters. However, you should be aware that computing and finalizing windows does take time, and so there will be a latency on the order of a few minutes. Because of per-table and per-project load quotas, and because failures and retries count against quota, we recommend that you do file loads no more frequently than every five minutes.
In this chapter, we looked at ways to control costs, by using the dry_run
feature and by setting up limits on the number of bytes billed. Then we examined ways to measure query speed using the REST API or a custom measurement tool.
We also covered several methods for increasing query speed. To minimize the I/O overhead, we recommended ways to reduce the data being read. We also looked at different entities that could be cached, from previous query results to intermediate results and entire tables in memory. We also looked at ways to do joins efficiently, by avoiding self-joins, reducing the data being joined, and taking advantage of precomputed values. By limiting large sorts, safeguarding against data skew, and optimizing user-defined functions, we can minimize the chances of slots getting overwhelmed. Finally, we recommended the use of approximate aggregation functions, including count and top functions.
Finally, we looked at ways of optimizing data storage and speeding up data access. We suggested that our applications be refactored to accept compressed, partial responses, send requests in batches, and perform bulk reads using the Storage API. We discovered that storing data as arrays of structs and as geography types brings performance advantages. We also looked at different ways of reducing the size of data being scanned, namely through partitioning and clustering.
We’d like to end this chapter the way we began, by recommending that, although these performance improvements can be significant, you should verify that they apply to your workflow. Follow this checklist if your query is running slow:
1 This quote is from his 1974 article “Structured Programming with go to Statements.” You can download a PDF of the article from http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.103.6084.
2 The difference can be difficult to measure precisely because BigQuery is a service, and network overhead to reach the service and the load on the service tends to vary. You might need to run the query many times to get a good estimate of the speed.
3 A slot is a unit of computational capacity required to execute SQL queries. BigQuery automatically calculates how many slots are required by each query. See Chapter 6 for more details.
4 Nonparallelizable operations might not add to the cost, because the remaining slots can presumably address other workloads.
5 See https://cloud.google.com/bigquery/docs/sandbox for further details.
6 See 07_perf/time_query.sh in the GitHub repository for this book.
7 See 07_perf/install_workload_tester.sh in the GitHub repository for this book.
8 See 07_perf/time_bqwt.sh in the GitHub repository for this book.
9 This is 07_perf/get_job_details.sh in the GitHub repository for this book. You can get the required job ID from the “Query history” in the BigQuery web UI. If your query was run outside the US and EU, you also need to specify the job location in a job resource object (sorry!). You can also get the job details by using bq ls -j
.
10 If you are reading this in a format that renders the diagram in grayscale, please try out the query and look at the query details in the BigQuery web UI.
11 You will need to measure this, of course. In some cases, the extra overhead involved in reading the table of intermediate results will make this more expensive than simply recomputing the results of a WITH
clause.
12 You can verify that this is the key reason for the incorrect value for num_babies
by adding state and year to the USING
clause (and making sure to add the two fields to the first two selects). Then the number of babies is in the correct ballpark (e.g., 2,018,162 for Jessie, whereas the correct answer is 229,263). The answer is still incorrect because rows with NULL
s for these fields are ignored by the join (NULL
is never equal to anything else).
13 In bicycling and running, pace is the inverse of speed.
14 This is a current limitation of the BigQuery dynamic execution runtime; it might be eased in the future.
15 This is 07_perf/get_job_details_compressed.sh in the GitHub repository for this book.
16 This is 07_perf/get_recent_jobs.sh in the GitHub repository for this book.
17 You can if you want to, however; see https://cloud.google.com/bigquery/docs/reference/storage/samples. The endpoint for the BigQuery Storage API is different from that of the BigQuery REST API—it’s bigquerystorage.googleapis.com.
18 For more details, see https://cloud.google.com/storage/docs/managing-lifecycles#change_an_objects_storage_class.
19 You can find all queries in this section in 07_perf/hurricanes.sql in the GitHub repository for this book.
20 Chapter 2 covers SQL syntax for working with arrays.
21 Because the hurricanes dataset is continually refreshed, the row numbers might be larger when you are reading this.
22 The dataset is bigquery-public-data.google_analytics_sample.ga_sessions_20170801
.
23 This is in 07_perf/google_analytics.sql in the GitHub repository for this book.
24 This is actually a deprecated dataset for reasons that will become apparent in this section. The more up-to-date data is bigquery-public-data.geo_us_boundaries.us_zip_codes
—that dataset does use geography types.
25 Chapter 4 covers geographic types and GIS functions.
26 See https://oreil.ly/PkIsx.
27 Try it using fh-bigquery.wikipedia_v2.pageviews_2017
.
28 This is the default, but you can request these quotas to be increased. See https://cloud.google.com/bigquery/quotas.
29 Because streaming ingests are charged for, you might have to ask for a quota increase. Check the default quotas at https://cloud.google.com/bigquery/quotas#streaming_inserts and request additional streaming quota from the GCP console.