Chapter 3. Presto Architecture

A NOTE FOR EARLY RELEASE READERS

This will be the 4th chapter of the final book. Please note that the Github repo will be made active later on.

If you have comments about how we might improve the content and/or examples in this book, or if you notice missing material, please reach out to the author at [email protected].

Presto is a distributed SQL query engine resembling MPP style commercial architectures such as Teradata, Netezza, and Vertica. Leveraging this architecture, the Presto query engine is able to process SQL queries on large amounts of data in parallel across a cluster of computers (or nodes). The Presto software runs as a single server process on a node. More than one node running Presto that are configured to coordinate with each other make up a distributed cluster. A typical Presto cluster are deployed on many nodes and is able to scale out and improve query performance by adding additional nodes to the Presto cluster. In {{Chapters X, Y, Z}} we will show how this architecture works well in cloud environments given the ability of Presto to scale out and in optimizing for costs.

In this chapter we will first discuss the Presto high level architectural components. Understanding this is important to have a general understanding of how Presto works or if you intend to install and manage a Presto cluster yourself. In the latter part of the chapter, we will dive deeper into those components in the Query Execution section. This is most important if you need to diagnose or tune a slow performance query, or if you plan to contribute to the Presto open source project.

Presto Distributed Components

There are two possible roles the Presto server can run as. Presto can run as what is known as a coordinator. And Presto can run as what is known as the worker. A typical Presto cluster, consists of 1 coordinator and 1 or more workers. The coordinator and workers in the cluster communicate and transfer data with each other via Presto’s REST API. Clients also communicate with Presto over the REST API.

Figure 3-1. Node Presto Cluster

Presto Coordinator

The Presto Coordinator can be thought of as the central brain of a Presto cluster. Users interact with the Coordinator via the Presto Command Line Interface (CLI), application using the JDBC or ODBC drivers, or any other available client libraries for a variety of languages. The Coordinator accepts statements from the client such as SELECT queries for query execution.

Once it receives the statement, the coordinator is responsible for parsing, analyzing, planning, and scheduling the query execution across the Presto worker nodes. As the workers process the data, the results are sent to the coordinator for the clients to fetch. As the data is pipelined through the Presto query engine, the clients will continuously fetched the data from the coordinator until the query execution is completed.

Figure 3-2. Clients communicate with a coordinator while workers process the results and return to clients.

Presto Worker

If the Presto coordinator can be thought of the central brain of the Presto cluster, then a Presto worker should be thought of the muscle. It is the job of the worker to execute the data processing tasks scheduled by the coordinator in order to execute the query. Workers process data by fetching data from the connectors or fetching data from other workers.

Figure 3-3. Rightmost source workers process data provided by connectors. Downstream intermediate workers process data provided by other workers.

During installation, Presto workers are configured to know the hostname or IP address of the coordinator. When a Presto worker starts up, it advertises itself to the coordinator service as being available for task execution.

Presto Connector Based Architecture

At the heart of the separation of storage and compute in Presto is the connector based architecture. A connector provides Presto an interface to access an arbitrary data source. Presto provides a Service Provider Interface (SPI) which is a type of API used to implement a connector. A good analogy of the SPI is Java Database Connectivity (JDBC). JDBC is a standard API that Java programs know how to use to interact with a database system. The raw interfaces to all database system may be different and by providing a JDBC driver one can interact with the database system by using the API. By implementing the SPI, Presto has a standard interface into the connector (or “driver”) implementation to connect to the data source.

Presto contains many built in connectors such as Hive, MySQL, PostgreSQL, SQL Server, Kafka, Cassandra, Redis, and many more. In {{Chapter X}} you will learn about several of the connectors. But the list is continuously growing and impossible for this book to remain in constant sync. You should refer to the Presto documentation for the latest list of connectors.

Presto’s SPI also gives you the ability to create your own customer connectors. This may be needed because you have a data source that there is not a connector for yet. In which case we encourage you to contribute it back. See {{Chapter X}} on the Presto community and how to contribute. A custom connector may also be needed if you have a unique or proprietary data source within your organization. This is what makes Presto truly “SQL-on-Anything.”

Figure 3-4. All of Presto connectors are implemented using Presto’s SPI. The Presto SPI also gives you the ability to write your own customer connectors.

Presto connectors are not separate processes, but rather plugins loaded into Presto. This is how both the built in and customer connectors work. The necessary Java Archive (jar) files are located or installed to a particular directory for Presto plugins. At startup, Presto loads the plugins as defined by the connector configurations. We will explore this more in {{Chapter X}} and {{Chapter Y}} as part of the Presto Connector and Presto SPI chapters.

Catalogs, Schemas and Tables

A catalog is a configured connector in Presto and exposed as a catalog in Presto. It is possible to have multiple catalogs for the same connector. For example, you may have more than 1 MySQL databases you wish to query from. Schemas in Presto exist within a catalog and contain a collection of tables. The connector implementation determines how the schema is mapped in the catalog. For example, a database in Hive is exposed as a schema in Presto for the Hive connector.

A table in Presto is the same as a table in a relational database where the table consists of rows, columns, and data types for those columns. The connector implementation determines how the table is mapped into the schema. For example, it’s straightforward how PostgreSQL tables will be exposed in Presto. However, it requires more creativity for a connector to Kafka. In such a case a Kafka topic is exposed as a table in Presto.

Tables are accessed in SQL queries using a fully qualified name:

<catalog-name>.<schema-name>.<table-name>
          

SQL queries referencing multiple tables can access tables across different catalogs and schemas. This is was allows for query federation in Presto. We will learn more about query federation in {{Chapter X}}.

Catalogs, schemas, and table information is not storage in Presto. Presto does not have its own catalog. It is the responsibility of the connector to provide this information to Presto. Typically this is done by querying the catalog from the underlying database or by some other configuration in the connector. The connector handles this and simply provides the information to Presto when requested.

Query Execution Model

In the previous sections we discussed the high level Presto architecture and Presto connector based architecture. This section will dive a little deeper into how Presto actually executes the SQL statements. Learning the execution model will provide the foundational concepts you need as you learn and use Presto. For example, these concepts are necessary if you want to understand how to tune Presto’s performance for your particular workload.

Recall that the Presto coordinator accepts SQL statements from the end user from either the CLI or software using the ODBC or JDBC drivers. The Presto coordinator will then parse, analyze, plan, and then schedule the SQL statement for execution across the Presto worker nodes.

Figure 3-5. How the Presto coordinator interacts with the connectors via the SPI

Let’s take a closer look into what’s inside the coordinator. When a SQL statement is submitted to the Presto coordinator, it is in textual format. The Presto coordinator will take that text and parse, analyze, and plan creating an internal data structure in Presto we will refer to as a query plan. The query plan broadly represents the needed steps to process the data and return the results per the SQL statement

Figure 3-6. The Presto coordinator accept the SQL text and then parses and creates a query plan.

This query plan is further extended into a distributed query plan consisting of one of more stages. When there is more than 1 stage, there forms dependency tree of stages. The number of stages really depends on the complexity of the query. If you’re familiar with Map-Reduce, you can think of the Presto stages are a series of Mappers and Reducers. But don’t take this analogy too literally as there are many differences. For example, in Map-Reduce there is a Map and Reduce stage where data is shuffled (or repartitioned) in between stages in similar concept to how Presto or other distributed systems repartition data. However, many complex queries require more than the two stages. In Map-Reduce, then requires multiple Map-Reduce jobs where the data is shared between Map-Reduce jobs by writing the data entirely to disk. This is very inefficient compared to Presto where data can be pipelines between stages and not written to disk.

Figure 3-7. A distributed query plan defines the stages and how the query is to execute on a Presto cluster. The logical query plan goes through a transformation on the coordinator to crete the distributed query plan.

Stages are really a logical representation of the distributed query plan. It is used by the coordinator to further plan and schedule what are known as tasks across the Presto workers. A stage consists of 1 or more tasks, but typically many tasks where each task process a piece of the data.

Figure 3-8. The Presto coordinator schedules tasks across a pool of Presto workers.

The unit of data that a task processes is described as a split. Splits are units of work to be done by connectors. For example, the Hive connector will describe splits in the form of a path to a file. At the source stage, splits are produced by the connectors and tasks process this data via drivers. Tasks at the source stage produce data in the form of pages which are a collection of rows in columnar format. These pages flow to other intermediate downstream stages. Pages are transferred between stages by exchange operators which read the data from tasks within an upstream stage.

Figure 3-9. In Presto, data flows between stages described a splits which are a unit of work to be done by the workers.

To start query execution, Presto is provided the list of splits from the connector via the SPI. Using the list of splits, Presto starts scheduling tasks on Presto workers to process the data in the splits. During query execution, the Presto coordinator tracks all splits available for processing and where tasks are running on workers and processing splits. As tasks finish processing and producing more splits for downstream processing, Presto will continue to schedule tasks until there are no splits left for processing. We will discuss this in greater technical detail in {{Chapter X}} on Presto tuning.

As we’ve seen that tasks represent the parallelization of processing data for stages across a network of workers, Presto has the concept of a driver that is represents the parallelization of processing data for tasks within a worker node. A task may have one or more drivers, depending on the Presto configuration and environment. We will discuss this in greater technical detail in {{Chapter X}} on Presto tuning.

An operator processes input data producing output data for a downstream operator. Example operators are table scans, filters, joins, and aggregations. A series of these operators form an operator pipeline. For example you may have a pipeline that first scans and reads the data, then filters on the data, and finally does a partial aggregation on the data. A driver is the finest unit of parallelism and is an instantiation of the operator pipeline for a given split. As the operator pipeline produces data, the driver will produce output that is combined by its task to produce the split for use by another downstream task.

Figure 3-10. The driver is an instantiation of the operator pipeline for a given split. The operator pipeline processes the data and outputs the pages for other downstream tasks to process.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset