Chapter 8. Connectors

A NOTE FOR EARLY RELEASE READERS

This will be the 6th chapter of the final book. Please note that the Github repo will be made active later on.

If you have comments about how we might improve the content and/or examples in this book, or if you notice missing material, please reach out to the author at [email protected].

Presto is a highly extensible system that allows you to process data from various data sources even within a single query. In the Introduction and Presto Architecture chapters, we learned how Presto is able to query from a variety of data sources. In this chapter we will learn more details in what makes Presto a SQL-on-Anything system. Over the last decade the amount of data, types of data, and data sources has grown dramatically. There is no longer a single solution to solve all use cases. Organizations are using the best tools appropriate for their data and workloads.

One size does not fit all, however, and there is an increasing need to provide a single consumption layer over the disparate data sources in organizations. The number of systems in many organizations has grown to a point where it’s becoming impractical, increasingly complex, and costly to manage moving or copying data across systems. By providing this single consumption layer, users can interact with their data from a single place and still leverage the capabilities of the underlying systems. For example, they can leverage data captured in Hadoop or cloud storage (S3, ADLS, GCS) and make further use of it when combined with data in their relational databases.

Presto fills this need by allowing for you to access data over a set of disparate data sources. We’ve seen how to the end user everything looks like it’s in the same place as though it is a single database. However, this is the work of the Presto connectors to expose data in Presto as if they are catalogs. Further, using Presto, you can also query across all of these various data sources in the same query. For example, you can join historical data stored in Google Cloud Storage and join it with operational data stored in PostgreSQL. Throughout the chapter, you’ll also learn about how to write these federated SQL queries in Presto.

Presto’s extensibility for querying from multiple data sources is accomplished by its plugin interface. Presto supports a wide variety of plugins that we will learn about in {Chapter X}. The most ubiquitous plugin in Presto are the connectors. Presto has an extensive SPI to allow for high performance connectivity to any data sources. There are over 2 dozen connectors shipped natively in Presto today. There are also commercial proprietary connectors to further extends the reach and performance of Presto. Last, if you have a custom data source or one that there is not a connector for, you can even implement your own connector by implementing the necessary SPI calls and drop in to the plugins directory in Presto.

The Connector Service Provider Interface (SPI) is what allows Presto to query data from relational systems, non relational systems, file systems, object stores. As long the underlying data sources can be conceptually mapped to tables, columns, and rows, a Presto connector can be created. The SPI can logically be divided into four pieces. The Metadata SPI is used to provide the metadata needed by Presto parser and planner. This includes metadata such as schemas, tables, columns, column types, and data statistics. The Data Location SPI is used to provide the location of the date needed by the Presto scheduler. In other to schedule tasks on the workers, Presto needs to inform the tasks where to get the data. The data location may vary depending on the date source. The Data Source SPI is used to transform the underlying data into Presto’s internal representation of data. This is used for when reading the data using SELECT. The Data Sink SPI is used to transform Presto’s internal representation of data to the underlying data sources. This is used for when writing the data as part of an INSERT or CREATE TABLE AS.

The semantics and complexity is determined and encapsulated by the connector implementation. For example, it may be straight forward of how relational PostgreSQL data is transformed into Presto’s internal format. However, a non relational system such as Apache Kafka is less straightforward. We will learn more about this later in the chapter. Performance is also determined by the connector implementation. The most basic connectors will make a single connection to the data source and provide the data to Presto. However, a more advanced parallel connector will allow for better performance, but more complex to implement.

Distributed Storage

The Presto Hive connector allows Presto to read and write from distributed storage such as HDFS. However, it is not constrained to HDFS, but to distributed storage in general. Currently, one can configure the Hive connector to HDFS, AWS S3, Azure Storage, Google Cloud Storage, Alluxio, and S3-Compatible Storage. S3-Compatible storage may include Minio, Ceph, or IBM Cloud Object Storage. There are a variety of these compatible stores, but as long as they implement the S3 API and behave the same way, Presto does not need to know the difference for the most part. We will learn more about cloud storage in the chapter on Presto in the Cloud.

The Hive connector itself is a bit of a misnomer given it does not push SQL processing to Hive. Instead it simply use the Hive Metastore and will access the data directly. It also assumes the Hive table format in how the data is organized in the distributed storage. This Hive connector is the most widely used and comprehensive connector and deserves a chapter of its own. We will learn more about this in the next chapter.

Relational Database Management Systems

Presto contains connectors to both open source and proprietary relational database management systems including MySQL, PostgreSQL, AWS Redshift, and Microsoft SQL Server. Presto queries these data sources by the connectors using each system’s respective JDBC drivers. Let’s look at a simple example using PostgreSQL. A PostgreSQL instance may consist of several databases. Each database contains schemas which contain objects such as tables and views. When configuring Presto with PostgreSQL you choose the database that will be exposed as a catalog in Presto.

presto> show catalogs;
Catalog
------------
system
postgresql
(2 rows)
presto> show schemas in postgresql;
Catalog
------------
public
airline
(2 rows)
presto> use postgresql.airline
presto:airline> show tables;
Table
---------
airport
carrier
(2 rows)

In the above example, you’ll see we connected to a PostgreSQL database that contains two schemas: public and airline. And then within the airline schemas, there are two tables airport and carrier. Let’s try running a query. In this example, we will issue a SQL query to Presto where the table exists in a PostgreSQL database. Using the PostgreSQL connector, Presto will be able to retrieve the data for processing returning the results to the user.

presto:airline> select code, name from airport where code = ‘ORD’;

code | name
------+------------------------------
ORD | Chicago O’Hare International
(1 row)

In this example, Presto is sending the entire SQL query to PostgreSQL using the PostgresSQL JDBC driver that is contained within the PostgresSQL connector plugin. PostgreSQL does the process to return the results over JDBC. The connector will read those results and write them to Presto internal data format. Presto continue the processing and then returns the results to the user. See Figure 1 for this explanation.

Figure 8-1. Presto Connector connects with PostgreSQL over JDBC. The results from PostgreSQL are translated to Presto internal format. Presto will finish the query execution and finally returning the results to the client.

Query Pushdown

As we saw in the previous example, Presto is able to offload processing by pushing the SQL statement down into the underlying data source. We refer to this as query pushdown. This is advantageous as the underlying system can reduce the amount of data returned to Presto avoiding unnecessary memory, cpu, and network costs. Furthermore, systems like PostgreSQL could have indexes on certain filter columns allowing for faster processing. However, it is not always possible to push the entire SQL statement down into Presto. Currently, the Presto Connector SPI limits the type of operations that can be pushed down to filters and columns projections.

presto:airline> select state, count(*)
from airport
group by state
where country = ‘US’;

Given the above Presto query, the PostgreSQL connector will construct the SQL query to push down to PostgreSQL.

select state
from airport
where country = ‘US’;

There are two important places to look at when queries are pushed from a RDBMS connector. The column in the SELECT list are set to specifically what is needed by Presto. In this case, we only need the state column for processing the GROUP BY in Presto. We also push the filter country = ‘US’ which means we don’t need to project the country column for further processing by Presto. You’ll notice that the aggregations is not pushed down to PostgreSQL. This is because Presto is not able to push any other forms of queries down and the aggregations must be performance in Presto today. This could be advantageous since Presto is a distributed query processing engine whereas PostgreSQL is not.

Figure 8-2. Presto Connector connects with PostgreSQL over JDBC. The results from PostgreSQL are translated to Presto internal format. Presto will continue with the distributed execution as with any query finally returning the results to the client.

If you do wish to push additional processing down to the underlying RDBMS source, you can accomplish this using views. If you encapsulate the processing in a view in PostgreSQL, it is exposed as a table to Presto and the processing would occur within PostgreSQL. For example, let’s say you created the view in PostgreSQL:

postgresql> create view my_view
select state, count(*) as count_star
from airport
group by state
where country = ‘US’;

When you run SHOW TABLES in Presto you’ll see this view you can query.

presto:airline> show tables;
Table
---------
airport
carrier
my_view
(3 rows)
presto:airline> select * from my_view;

Parallelism and Concurrency

Currently, all the RDBMS connectors are based using JDBC to make a single connection to the underlying data source. The data is not read in parallel even if the underlying data source is a parallel system. For parallel systems, like Teradata or Vertica, one would have to write parallel connector that can take advantage of how those systems store the data distributed. Writing such a connector is often a lot of work and non trivial which is likely why one does not exist in the open source yet.

When accessing multiple tables from the same RDBMS, one JDBC connection is created and used for each table in the query. For example, if the query is performing a join between two tables in PostgreSQL, Presto would create two different connections over JDBC to retrieve the data and then the join would be performance in parallel in Presto. As with aggregations, joins can not be pushed down. However, the join can be encapsulated in a view so that the join is performance in the underlying store if desired.

Figure 8-3. Extension of Figure 2 depicting Presto having multiple connections to PosgreSQL when 2 tables are being queried.

Presto RDBMS Connectors

Currently, there are four RDBMS connectors in the Presto open source project. MySQL, PostgreSQL, AWS Redshift, and Microsoft SQL Server are already included in the plugins directory of Presto and ready to be configured. If you have multiple of a particular database, you can configure multiple catalogs in Presto for each instance. You just need to name the *.properties file differently.

presto> show catalogs;
Catalog
------------
system
mysql-dev
mysql-prod
mysql-site
(2 rows)

There are some nuances between different RDBMS. For example, in MySQL there is no difference between a database and a schema. Unlike PostgreSQL where there is a clear distinction where a PostgreSQL instance can contain multiple databases which contain schemas. Let’s take a look at how each is configured in their *.properties configuration files.

connector.name=mysql
connection-url=jdbc:mysql://example.net:3306
connection-user=root
connection-password=secret
connector.name=postgresql
connection-url=jdbc:postgresql://example.net:5432/database
connection-user=root
connection-password=secret
connector.name=sqlserver
connection-url=jdbc:sqlserver://example.net:1433
connection-user=root
connection-password=secret
connector.name=redshift
connection-url=jdbc:postgresql://example.net:5439/database
connection-user=root
connection-password=secret

You’ll notice how PostgreSQL and AWS Redshift are the same. In fact, Redshift uses the PostgreSQL driver (that is not a typographical error in the book!). Redshift was original based on the open source PostgreSQL code which is why the drivers can be shared. SQL Server connection strings looks similar to MySQL’s string. Except SQL Server does have the notion of databases and schemas. There is a way to specify a named database in the connection string1. If you don’t specify it then Presto will connect to the default database.

Security

Currently, the only way to authenticate for RDBMS connectors is by storing the username and password in the connector configuration file. As Presto is designed to be a trusted system, this should be sufficient for most use. In order to keep it secure, it’s important to secure access to the machines and configuration files. It should be treated the same way as a private key. However, if you do not want to or are unable to store a password in cleartext, there are ways to pass through the username and password from the Presto client. We will discuss this further in {Chapter X} on Presto security.

Non-Relational Data Sources

Presto contains connectors to query the variants of non relational data sources. The data source may be key-value stores, column stores, stream processing systems, and document stores. These systems may also be referred to as NoSQL databases. By having Presto connectors for these NoSQL data sources, you can run SQL queries on the data as if they were relational. This allows you to use applications such as Business Intelligence tools or allow those who know SQL to query these data sources. Some of these data sources provide SQL-like capabilities such as CQL2 by Cassandra. However, the completeness of these SQL variant languages is often limited and not SQL standard. In addition, using these connectors allows you to perform joins, aggregations, subqueries, and other advanced SQL capabilities against these data sources. And further allow for you to query across these stores.

Key Value Stores

There are a number of key-value data stores that Presto has connectors for. Briefly, a key-value store is a system for managing a dictionary of records stored and retrieved by using a unique key. Imagine a hash table for which a record is retrieved by a key. This record may be a single value, multiple values, or some other collections. There are several key-value stores systems that range in their functionalities. The list of data sources and connectors in Presto is always growing and impossible to keep an updated version of all the connectors for this book. Therefore, we will choose Apache Accumulo to describe how Presto works with Key-Value stores. But the general concepts should translate for the other Key-Value stores. And always best to look at the online Presto documentation for specifics about each.

Inspired by Google’s BigTable3, Apache Accumulo is a sorted, distributed key-value stores for scalable stores and retrieval4. Accumulo stores key-value data on HDFS sorted by the key.

Figure 8-4. The data model in Apache Accumulo consists of a key-value pair stored on HDFS.

In Accumulo, a key is not a simple key, but rather consists of a triplet of Row ID, Column, and Timestamp. The key is sorted first by the key and the column in ascending lexicographic order. And then timestamps in descending order. Accumulo can be further optimized by utilizing column families and locality groups. Most of this is transparent to Presto, but knowing the access patterns of your SQL queries may help you optimize how your Accumulo tables are created. This would be no different than optimizing the tables for applications using Accumulo. Let’s take a look at the logical representation of a relational table:

rowid flightdate flightnum origin dest
1234 2019-11-02 2237 BOS DTW
5678 2019-11-02 133 BOS SFO
... ... ... ... ...

Figure 5: Relation view of the data in Accumulo

As Accumulo is a key-value store, it will store this representation of data on disk different than the logic view.

rowid column value
1234 flightdate:flightdate 2019-11-02
1234 flightnum:flightnum 2237
1234 origin:origin BOS
1234 dest:dest DTW
5678 flightdate:flightdate 2019-11-02
5678 flightnum:flightnum 133
5678 origin:origin BOS
5678 dest:dest SFO
... ... ...

Figure 6: View of how Accumlo stores data

Unlike we have previous seen with the relation databases connectors, Accumulo’s data model is not relational and is less straightforward how Presto can read from it. That is the purpose of the connectors in Presto. The Presto Accumulo connector handles mapping the Accumulo data model into a relation one that Presto can understand.

Figure 8-5. Basic Accumulo Architecture consisting on distributed Accumulo, HDFS, and Zookeeper

At its core, Apache Accumulo is a distributed system that consists of a master node and a number of tablet servers. Tablet servers consist of tablets which are horizontally partitioned pieces of a table. Clients connect directly to the tablet servier to scan the data that is needed. Accumulo uses Apache Zookeeper to keep its metadata about the tables. The Presto Accumulo Connector also uses Zookeeper since it’s already available. Presto will keep information such as tables, view, table properties, and column definitions in Zookeeper.

Figure 8-6. Inside Accumulo demonstrating Table Server and Tablets

Let’s take a look at how one can scan data from Accumulo from Presto. In Accumulo, key-pairs can be read from a table using the Scanner object. The scanner starts reading from the table at a particular key and ends at another key (or reaching the end of the table). The scanners can also be configured to only read the exact columns needed. Recall from the RDBMS connectors where only the columns needed were added to the SQL query generated to push into the database. Accumulo also has the notion of a BatchScanner object. This is used when reading from Accumulo over multiple rangers. This is more efficient because it is able to use multiple threads to communicate with Accumulo.

Figure 8-7. Presto Connector reads data from Accumulo in parallel

The user first will submit the query the coordinator and the coordinator communicates with Accumulo to determine the splits. It determines the splits by looking for the ranges from the available index in Accumulo. We will discuss indexes in more details below. Accumulo will return the row ids from the index and Presto will store these rangers in the split. If an index cannot be used, then there will be 1 split for all the ranges in a single tablet. Last, the Presto worker will use this information to connect to the Tablet Service and pull the data in parallel from Accumulo. This pulls the database by using the BatchScanner utility from Accumulo.

Once the data is pulled back from the Presto workers, the data is put into a relation format that Presto understands and the remainder of the processing is completed by Presto. In this case, Accumulo is being used for the data store. And Presto provides the higher level SQL interface for access to the data in Accumulo.

If you were writing an application program yourself to retrieve the data from Accumulo, you’d write something similar to the above. Where you set the ranges to be scanned and which columns to be fetch. The concept to how we prune columns is similar to the RDBS connectors. But instead of pushing down SQL, the Accumulo connector will use the Accumulo API to set what columns to fetch.

ArrayList<Range> ranges = new ArrayList<Range>();
ranges.add(new Range(“1234”));
ranges.add(new Range(“5678”));
BatchScanner scanner = client.createBatchScanner(“flights”, auths, 10);
scanner.setRangers(ranges);
scanner.fetchColumn(“flightdate”);
scanner.fetchColumn(“flightnum”);
scanner.fetchColumn(“origin”);
for (Entry<Key,Value> entry : scanner) {
// populate into Presto format
}
}

Using the Presto Accumulo Connector

As with the other connectors in Apache Accumulo, it will require a properties configuration file within the etc/catalog directory.

connector.name=accumulo
accumulo.instance=accumulo
accumulo.zookeepers=zookeeper.example.com:2181
accumulo.username=user
accumulo.password=password

Using our flights example from earlier, let’s create a table in Accumulo through Presto.

CREATE TABLE accumulo.ontime.flights (
rowid VARCHAR,
flightdate VARCHAR,
flightnum, INTEGER,
origin VARCHAR
dest VARCHAR
);

When you create this table, it will create a table in Accumulo which stores this data in Zookeeper. It is also possible to create a column family. A column family in Accumulo is an optimizer for applications that access columns together. By defining column families, Accumulo arranges how columns are stored on disk such that the frequently accessed columns, as part of a column family, are stored together. If you wish to create a table using column families you can specify this as a table property.

CREATE TABLE accumulo.ontime.flights (
rowid VARCHAR,
flightdate VARCHAR,
flightnum, INTEGER,
origin VARCHAR
dest VARCHAR
)
WITH
column_mapping = ‘origin:location:origin,dest:location:dest’
);

Using the column_mapping we are able to define a column family location with column qualifiers origin and dest (which are the same as the Presto column names).

Note

When the column_mapping table property is not used, Presto will auto generate a column family and column qualifier to be the same name as the Presto column name. You can observe the Accumulo column family and column qualifier by running the DESCRIBE command on the table.

The Presto Accumulo connector supports INSERT statements. For example, you could run the following SQL INSERT.

INSERT INTO accumulo.ontime.flights VALUES
(2232, ‘2019-10-19’, 118, ‘JFK’, ‘SFO’);

This is a convenient way to insert data. However, it is currently low throughput when data is written from Accumulo from Presto. In order to have better performance, you’ll want to use the native Accumulo APIs. The Accumulo connector has a number of utilities outside of Presto for assisting with higher performance of inserting data. You can find more information about loading data using the separate tool in the Presto documentation.

The table we created in the above example is referred to as an internal table. However, the Presto Accumulo connector support both internal and external table. The only difference between both types is that when dropping the external table data is not deleted. However, things such as the metadata is. External tables allow you to create Presto tables that already exist in Accumulo. Furthermore, if you need to change the schema, such as to add a column, you can simply drop the table and recreate it in Presto without losing the data. It’s worth noting that Accumulo and support this schema evolution when each row does not need to have the same set of columns. It will require a bit more work to use external tables because the data is already stored in a particular way. For example, you must use the column_mapping table property when using external tables.

CREATE TABLE accumulo.ontime.flights (
rowid VARCHAR,
flightdate VARCHAR,
flightnum, INTEGER,
origin VARCHAR
dest VARCHAR
)
WITH
external = true,
column_mapping = ‘origin:location:origin,dest:location:dest’
);

Predicate Pushdown in Accumulo

In the Accumulo connector, Presto can take advantage of the secondary indexes built in Accumulo. In order to achieve this, the Presto Accumulo connector requires a custom server side iterator on each Accumulo TabletServer node. In addition to the configuration file in etc/catalog, you also need to copy the presto-accumulo jar from the Presto code onto the TabletServer at $ACCUMULO_HOME/lib/ext. You can find the exact details of how to do this in the documentation.

Indexes in Accumulo are used to look up the row ids for which can be then used to read the values from the actual table. Let’s look at an example.

SELECT flightnum, origin
FROM flights
WHERE flightdate between DATE ‘2019-10-01’
AND 2019-11-05’
AND origin = ‘BOS’

Without an index, Presto would read the entire data set from Accumulo and then filter it within Presto. The Presto workers gets splits that contain the Accumulo Range to read. This Range is the entire range of the tablet. Where there is an index, the amount of Rangers to process can be significantly reduced.

2019-08-10 flightdate_flightdate:2232 []
2019-10-19 flightdate_flightdate:2232 []
2019-11-02 flightdate_flightdate:1234 []
2019-11-02 flightdate_flightdate:5478 []
2019-12-01 flightdate_flightdate:1111 []
SFO origin_origin:2232 []
BOS origin_origin:3498 []
BOS origin_origin:1234 []
BOS origin_origin:5678 []
ATL origin_origin:1111 []
... ... ...

Figure 10 - Example index on the flights table

The coordinator will use the WHERE clause filters flightdate between DATE ‘2019-10-01’ AND 2019-11-05’ AND origin = ‘BOS’ to scan the index obtain the row ids for the table. The row ids are then packed into the split that the worker will later use to access the data in Accumulo. In our example, we have secondary indexes on flightdate and origin and we collected the row ids {2232, 1234, 5478} and {3498, 1234, 5678}. We take the intersection from each index and know that we only have to scan row ids {1234, 5678}. This Range is then placed into the spit for processing by the worker.

rowid column value
2232 flightdate:flightdate 2019-10-19
2232 flightnum:flightnum 118
2232 origin:origin JFK
2232 dest:dest SFO
1234 flightdate:flightdate 2019-11-02
1234 flightnum:flightnum 2237
1234 origin:origin BOS
1234 dest:dest DTW
5678 flightdate:flightdate 2019-11-02
5678 flightnum:flightnum 133
5678 origin:origin BOS
5678 dest:dest SFO
3498 flightdate:flightdate 2019-11-10
3498 flightnum:flightnum 133
3498 origin:origin BOS
3498 dest:dest SFO
... ... ...

As we have seen in order to take advantage of predicate pushdown, we need to have indexes on the columns we wish to push down predicates for. Through the Presto connector, indexing on columns can be easily enabled by another table property.

CREATE TABLE accumulo.ontime.flights (
rowid VARCHAR,
flightdate VARCHAR,
flightnum, INTEGER,
origin VARCHAR
dest VARCHAR
)
WITH
Index_columns = ‘flightdate,origin’
);

Streaming Systems and Document Stores

We have seen the the Presto Accumulo connector allows one run SQL queries on a key-value store such as Apache Accumulo. In addition to key-value stores, there are other non relation (NoSQL) systems that Presto has connector for.

Streaming Systems and Publish-Subscribe (pub/sub) systems designed for handling real time data feed. For example, Apache Kafka was created to be a high-throughput and low latency platform at LinkedIn. Publishers write messages to Kafka for the subscribers to consume. Such a system is generally useful for data pipelines between systems. The Presto Kafka connector is used to read data from Kafka as the consumer. Currently one cannot use the connector to publish data.

Using the Presto Connector, one can use SQL to query what is on a Kafka topic and even join it with other data. Furthermore, using a CREATE TABLE AS or an INSERT SELECT, you could read data from the Kafka topic, do some transformations using SQL, and then write it to HDFS, S3, or some other storage. AWS Kinesis is another stream processing system for which a Presto connector exists.

There also exists a class of Document stores such as ElasticSearch and MongoDB. These systems support storage and retrieval of information in JSON-like documents. There are different systems but may be used for similar use cases or different use cases entirely. For example, ElasticSearch is more well suited for indexing and searching documents. Whereas MongoDB is more a general purpose document store. Presto have connectors for both these systems. Allowing users to use SQL to access these systems.

Other Connectors

There are a number of other connectors and the list is constantly growing. Always refer to the Presto documentation at https://prestosql.io/ for an updated list of connectors and feature functionality. We have learned in other chapters about the System Connector that provides metrics and other metadata about the Presto cluster. In chapter on performance tuning, we saw how the JMX connector allow one to query Java Management Extensions (JMX) information about the Java Virtual Machine (JVM) and Presto metrics exposed. Allowing one to use SQL is incredibly useful for monitoring and troubleshooting purposes. Last, we have seen the the TPC-H and TPC-DS connectors in the earlier Getting Started Chapter. These built in connectors generate those benchmark datasets on the fly allowing one to quickly try out Presto. Furthermore, they are very useful for benchmarking when developing in Presto.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset