Chapter 9. Federated Queries

In {{Chapter X}} we discussed the various connectors supported in Presto today. We also did deeper dives and example walkthroughs in several of the popular connectors. Over the last decade the amount of data, types of data, and data sources has grown dramatically. There is no longer a single solution to solve all use cases. Organizations are using the best tools appropriate for their data and workloads. In this chapter we will learn about query federation using Presto.

One size does not fit all, however, and there is an increasing need to provide a single consumption layer over the disparate data sources in organizations. The number of systems in many organizations has grown to a point where it’s becoming impractical, increasingly complex, and costly to manage moving or copying data across systems. By providing this single consumption layer, users can interact with their data from a single place and still leverage the capabilities of the underlying systems. For example, they can leverage data captured in Hadoop or cloud storage (S3, ADLS, GCS) and make further use of it when combined with data in their relational databases.

Presto fills this need by allowing for you to access data over a set of disparate data sources. To the end user, everything looks like it’s in the same place as though it is a single database with all their data available across the organization. Using Presto, you can federate and query across all of these various data sources in the same query. Throughout the chapter you’ll learn about how to write federated SQL queries in Presto.

Query Federation in Presto

We will go through a use case of joining data in distributed storage with data in a relational database management system. We’ve provided the necessary setup in the book’s accompanying GitHub repository. And we have also made the data public for something you can try and do once you’ve completed reading this chapter.

For our federation example, we are going to use airline data that we’ve collected and curated from Flight Aware1 data collected from the FAA. Each row in the data set represents either a departure or arrival of a flight at an airport in the United States. The schema is fairly large, so we’ve show just a subset of the columns.

flightdate airlineid origin dest arrtime deptime

With this data you can ask questions such as “What is an average delay of airplanes by year?

SELECT avg(depdelayminutes) AS delay, year 
FROM flights_orc
GROUP BY year
ORDER BY year DESC;

Or “What are the best days of the week to fly out of Boston in the month of February?

SELECT dayofweek, avg(depdelayminutes) AS delay 
FROM flights
WHERE month=2 AND origincityname LIKE '%Boston%'
GROUP BY dayofmonth
ORDER BY dayofweek;

As we mentioned earlier in the chapter, we have provided the DDL, access to the data, and additional queries on the GitHub repository. Because the notion of multiple data sources and query federation is an integral part of Presto, we encourage you to set up an environment and explore the data. We’ve included many additional interesting analytical queries to run. And these should also serve as inspiration for you to create additional queries on your own. Even feel free to create a Pull Request to add to our collection.

We will use two example analytical queries on the airline data demonstrate query federation in Presto. The setup we provide uses data stored in AWS S3 and accessed by configuring Presto with the Hive connector. However, if you prefer, you can also store the data in HDFS, Azure Storage, or Google Cloud Storage and use the Hive connector to query the data. {{Chapter X}} covers using Presto in cloud environments querying from cloud storage.

In this first example query, we want Presto to return the top ten airline carriers with the most flights.

presto:ontime> SELECT uniquecarrier, count(*) AS ct 
FROM flights_orc
GROUP BY uniquecarrier
ORDER BY count(*) DESC
LIMIT 10;
uniquecarrier | ct
---------------+----------
WN | 24096231
DL | 21598986
AA | 18942178
US | 16735486
UA | 16377453
NW | 10585760
CO | 8888536
OO | 7270911
MQ | 6877396
EV | 5391487
(10 rows)

While the above query provides us the results for the top ten airline carriers with the most flights, it requires you to understand the values of uniquecarrier. It would be better if there existed a more descriptive column that provided the full airline carrier name instead of the abbreviations. Here the airline data source we are querying from does not contain such information. Perhaps if another data sources with the information does exist, then we could combine the data source to return more comprehensible results.

Let’s look at another example where want Presto to return top ten airports that had the most departures.

presto:ontime> SELECT origin, count(*) AS ct
FROM flights_orc
GROUP BY origin
ORDER BY count(*) DESC
LIMIT 10;
origin | _col1
--------+---------
ATL | 8867847
ORD | 8756942
DFW | 7601863
LAX | 5575119
DEN | 4936651
PHX | 4725124
IAH | 4118279
DTW | 3862377
SFO | 3825008
LAS | 3640747
(10 rows)

As with the previous query, the results require some domain expertise. For example, you may the origin column contains airport codes. In the next section, we will show you how to obtain more meaning from these example queries but writing a federated query to combine our airline data source with another data source.

the code will be meaningless to people with less expertise analyzing the results.

Writing Federated Queries

In the previous section we provide two example queries to provide us the answers to:

  • What are the top ten airline carriers with the most flights?

  • What are the top ten airports that had the most departures?

By querying our airline data source, we were able to obtain the results. However, it was somewhat difficult to extract meaning from two of the columns. In the following examples we will show how we can provide more meaning to the results by combining with additional data in a relational database. We choose to use PostgreSQL in our examples, but it will be applicable for any relational database.

As with the airline data, our GitHub repository includes the setup for creating and loading tables in a relational database as well as configuring the Presto connector to access it. We’ve chosen to configure Presto to query from a PostgreSQL database that contains additional airline data. One table, carrier, in PostgreSQL provides a mapping of the airline code to the more descriptive airline name. We will use this additional data with our first example query. Let’s take a look at table carrier in PosrtgreSQL.

presto:public> SELECT * FROM carrier LIMIT 10;
code | description
------+----------------------------------------------
02Q | Titan Airways
04Q | Tradewind Aviation
05Q | Comlux Aviation, AG
06Q | Master Top Linhas Aereas Ltd.
07Q | Flair Airlines Ltd.
09Q | Swift Air, LLC
0BQ | DCA
0CQ | ACM AIR CHARTER GmbH
0GQ | Inter Island Airways, d/b/a Inter Island Air
0HQ | Polar Airlines de Mexico d/b/a Nova Air
(10 rows)

This looks promising. You’ll see that there is a column code along with a description of the code. Using this information, we can use our first example query for the flights_orc table and modify it to join with the data in the PostgreSQL carrier table.

presto:ontime> SELECT f.uniquecarrier, c.description, count(*) AS ct 
FROM hive.ontime.flights_orc f,
postgres.public.carrier c
WHERE c.code = f.uniquecarrier
GROUP BY f.uniquecarrier, c.description
ORDER BY count(*) DESC
LIMIT 10;
uniquecarrier | description | ct
---------------+----------------------------+----------
WN | Southwest Airlines Co. | 24096231
DL | Delta Air Lines Inc. | 21598986
AA | American Airlines Inc. | 18942178
US | US Airways Inc. | 16735486
UA | United Air Lines Inc. | 16377453
NW | Northwest Airlines Inc. | 10585760
CO | Continental Air Lines Inc. | 8888536
OO | SkyWest Airlines Inc. | 7270911
MQ | Envoy Air | 6877396
EV | ExpressJet Airlines Inc. | 5391487
(10 rows)

Viola! Now that we have written a single SQL query to federate data from S3 and PostgreSQL, we’re able to provide more valuable results of the data for one to extract meaning. Instead of having to know or separately look up the airline codes, the descriptive airline name is in the results.

In the query we wrote, you’ll see in bold the places we added columns from the postgres table carrier. We also used fully qualified names when referencing the tables. When utilizing the USE command to set the default catalog and schema, a non qualified table name is linked to that catalog and schema. However, anytime you need to query outside for the catalog and schema, the table name must be qualified. Otherwise Presto will try to find it within the default catalog and schema, and will return an error. If you are referring to a table within the default catalog and schema, it is not required to fully qualify the table name. However, it’s recommended to as best practice whenever referring to data sources outside of the default scope.

Next, let’s look at another table airport in PostgreSQL. This table will be used as part of federating our second example query.

presto:public> SELECT code, name, city FROM airport LIMIT 10;
code | name | city
------+--------------------------+----------------------
01A | Afognak Lake Airport | Afognak Lake, AK
03A | Bear Creek Mining Strip | Granite Mountain, AK
04A | Lik Mining Camp | Lik, AK
05A | Little Squaw Airport | Little Squaw, AK
06A | Kizhuyak Bay | Kizhuyak, AK
07A | Klawock Seaplane Base | Klawock, AK
08A | Elizabeth Island Airport | Elizabeth Island, AK
09A | Augustin Island | Homer, AK
1B1 | Columbia County | Hudson, NY
1G4 | Grand Canyon West | Peach Springs, AZ
(10 rows)

Looking at this PostgreSQL, you’ll see that there is a column code that can be used to join with our second query on the flight_orc table. Doing this will allow us to extend the additional information in the airport table to our query to provide a more details an understandable report.

presto:ontime> SELECT f.origin, c.name, c.city, count(*) AS ct
FROM hive.ontime.flights_orc f,
postgres.public.airport c
WHERE c.code = f.origin
GROUP BY origin, c.name, c.city
ORDER BY count(*) DESC
LIMIT 10;
origin | name | city | ct
--------+-----------------------------------+-----------------------+---------
ATL | Hartsfield-Jackson Atlanta International | Atlanta, GA | 8867847
ORD | Chicago O'Hare International | Chicago, IL | 8756942
DFW | Dallas/Fort Worth International | Dallas/Fort Worth, TX | 7601863
LAX | Los Angeles International | Los Angeles, CA | 5575119
DEN | Denver International | Denver, CO | 4936651
PHX | Phoenix Sky Harbor International | Phoenix, AZ | 4725124
IAH | George Bush Intercontinental/Houston | Houston, TX | 4118279
DTW | Detroit Metro Wayne County | Detroit, MI | 3862377
SFO | San Francisco International | San Francisco, CA | 3825008
LAS | McCarran International | Las Vegas, NV | 3640747
(10 rows)

Presto! As with our first example, we can provide more meaningful information by federating across two disparate data sources in order to combine and provide more meaningful information. Here we are able to add in the name of the airport instead of the user relying on airport codes that are hard to intrepret.

Extract, Transform, Load

Extract Transform Load (ELT) is a term used to describe the technique of copying data from data sources and landing it into another data source. Often there is a middle step of transforming the data from the source in preparation for the destination. This may include dropping columns, feature engineering, joining in data, pre aggregations and other ways to prepare and make it suitable for querying the destination.

Presto is not intended to be a full fledged ETL tool comparable to a commercial solution. However, it can assist by avoiding the need for ETL. Because Presto can query from the data source, there may no longer be a need to move the data. Presto will query the data where it lives to alleviate the complexity of managing the ETL process.

It may still be desired to do some type of ETL transformations. Perhaps you want to query on pre aggregated data or you don’t want to put a stress on the underlying system. By using the CREATE TABLE AS or INSERT SELECT constructs you can move data from one data source into another such as HDFS or S3.

Federated Architecture

Now that we’ve gone through some examples of query federation from an end user perspective, let’s discuss the architecture of how this works. We will build some of the concepts we learned about in {{Chapter X}} on the Presto architecture.

Presto is able to coordinate the hybrid execution of the query across the data sources involved in the query. In the example earlier, we were querying between distributed storage and PosgreSQL. For distributed storage via the Hive connector, Presto reads the data files directly whether it’s from HDFS, S3, Azure Blob Storage, etc. For a relational database connector such as the PostgreSQL connector, Presto will rely on PostgreSQL to do part of the execution. Let’s use our query from earlier, but to make it more interesting let’s add a new predicate that refers to a column in the PostgreSQL airport table.

SELECT f.origin, c.name, c.city, count(*) AS ct
FROM hive.ontime.flights_orc f,
postgres.public.airport c
WHERE c.code = f.origin AND c.state = ‘AK’
GROUP BY origin, c.name, c.city
ORDER BY count(*) DESC
LIMIT 10;

The logical query plan will resemble something similar the following figure. You will see the plan consists of scanning both the flights_orc and airport tables. Both inputs are fed into the join operator. But before the airport data is fed into the join, a filter is applied because we only want to look at the results for airports in Alaska. After the join, the aggregation and grouping operation is applied. And then finally the TopN operator performs the order by and limit combined

Figure 9-1. This logical query plan in Presto shows how data is federated across two data sources using the Hive and PostgreSQL connector.

In order for Presto to retrieve the data from PostgreSQL, it will send a query via JDBC. For example, in the naive approach the following query is sent to PostgreSQL.

SELECT * FROM public.airport

However, Presto is smarter than this, and the Presto optimizer will try to reduce the amount of data transferred between system. In this example, Presto will only query the columns it needs from the PostgreSQL table as well as push down the predicate into the SQL that is sent to PostgreSQL.

So now the query sent from Presto to PostgreSQL will push more processing to PostgreSQL.

SELECT code, city, name FROM public.airport WHERE state = ‘AK’

As JDBC connector to PostgreSQL returns data to Presto, Presto will continue processing the data for the part that will be executed in the Presto query engine.

Figure 9-2. Using the PostgreSQL connector, Presto will push down filter processing into PostgreSQL helping improve query performance by retrieving the data Presto needs.

Some simpler queries such as SELECT * FROM public.airport would be entirely pushed down into the underlying data source such that more the query execution happens outside of Presto and Presto acts as a passthrough.

Currently, more complex SQL pushdown is not supported. For example, Aggregations or Joins that involve only the RDBMS data could be pushed into the RDBMS to eliminate data transfer to Presto. The Presto community has been discussing this and will be something Presto will support in time.

Security in Federation

We will learn more about Presto security in general in {{Chapter X}}. For query federation, both user authentication (“who are you?”) and use authorization (“what can you do?”) are important pieces. There are multiple points for authentication (“AuthN”) and authorization (“AuthZ).

Figure 9-3. Presto is designed to be a trusted system and authenticates to the relational databases. {{Chapter X}} discusses Presto Security in greater detail.

The user will first authenticate to the coordinator. And because Presto connector will be issuing requests to the data sources there will be some authentication required as well. The client can authenticate to the Presto coordinator over LDAP or Kerberos. Authorization is enforced by Presto’s system access control which enforces authorization at the global level.

Authentication from the connectors to the data sources are connector implementation dependent. In many connector implementations, Presto authenticates as a service users. Therefore any users that runs a query using such a connector, the query will be executed in the underlying system as that service users. To provide a more fine grained access control, you could create several different service users with different permissions. You can then have multiple configurations of the connector for each service user. And finally, restrict access to these connectors by levering the system access control for catalogs.

1 https://flightaware.com/

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset