Skip to main content

Klustron Multi-Level Parallel Query Processing Technology

KlustronAbout 10 min

Klustron Multi-Level Parallel Query Processing Technology

Background

This article focuses on how Klustron performs multi-level parallel execution of distributed query tasks across the entire cluster, making full use of CPU cores, memory, and storage resources in both the computing and storage nodes to achieve exceptional query processing performance. In the ideal scenario, Klustron can harness all server resources within the cluster to execute the same query statement, achieving optimal performance.

Overview of Klustron Multi-Level Parallel Query Processing Technology

Klustron's multi-level parallel query processing architecture consists of three levels—within the compute node, between the computing and storage nodes, and within the storage node—where multiple sub-tasks of the same user query statement can be executed in parallel. Let's briefly explore the parallel query mechanisms at each level:

  1. Within the Compute Node:

Klustron inherits and enhances PostgreSQL's parallel query architecture. PostgreSQL supports using multiple processes to execute qualifying query tasks in parallel. The optimizer determines whether to employ parallel query processing based on the characteristics of the query statement and data, as well as resource constraints. If parallel processing is used, it creates several worker threads and assigns them sub-tasks from the query plan that can be executed in parallel, achieving parallel query processing.

  1. Between Computing and Storage Nodes:

Klustron introduces a novel design to enable parallel queries between computing and storage nodes. It can work independently of the compute node's parallel query framework or cooperate with it.

When Klustron's compute node inserts, updates, or deletes multiple rows of data distributed across multiple shards of table partitions, it asynchronously sends data read and write statements to the target storage nodes instead of sequentially sending them to one storage node and waiting for the results before moving to the next storage node. This approach allows multiple storage nodes to execute data read and write statements in parallel, improving overall performance.

  1. Within Storage Nodes:

Klustron's team has developed advanced features in the storage node (Klustron-storage) to support multi-level parallel query processing. This allows multiple read-only query sub-tasks of the same query to be executed in parallel across multiple connections within the same storage node. Additionally, in Klustron-storage, we have implemented new high-performance incremental execution technology to improve the fetch() performance of prepared statements, significantly boosting the execution performance of queries containing limit clauses and subqueries (when they cannot be converted to semi-joins).

The following sections provide detailed explanations.

Klustron Compute Node Parallel Query Processing

In the PostgreSQL multi-process parallel query processing architecture, the processes that handle user connections are referred to as backend processes. They are responsible for receiving SQL requests from users, executing those requests, and ultimately returning the query results to the clients. As part of query execution, backend processes perform query optimization and sequential query execution.

If the query plan includes parallel executable sub-plans (PESPs), the backend process is also known as the leader process. As part of query execution, the leader process creates several worker subprocesses to execute different data intervals of the PESP, and each data interval is called a partial plan.

The leader process divides the tasks (partial plans) to be read by the PESP and assigns them to worker processes for execution. Each worker process executes the same tree structure of the PESP but with different data intervals, and each returns non-overlapping sub-result sets to the Gather or Gather Merge nodes, which collect the results from all workers.

For example, during query optimization, the leader process may convert a sequential scan node that scans 100,000 rows of data into four Parallel_seqscan subtasks, each scanning 25,000 rows. These subtasks are then assigned to four worker processes for execution. Each worker executes the Parallel_seqscan operation with different parameters, scanning different data intervals. The leader process may also act as a worker process and execute one of the subtasks. Once the subtasks are completed, the leader process executes the Gather or Gather Merge node to collect and merge the results from each worker process to produce the final result for the PESP, which is then passed to the higher-level nodes.

图片

(Parallel Query Example)

The figure above illustrates an example of Klustron's distributed parallel query plan. The subsequent content will provide a more detailed analysis and explanation of its features.

Not all Plan nodes can be executed in parallel. The query optimizer generates query plans based on the parallel execution attributes of the Plan nodes, which fall into three categories: Parallel Safe, Parallel Restricted, and Parallel Unsafe.

  • Parallel Safe nodes can be part of the parallel execution sub-tree or located elsewhere in the query plan containing parallel execution plans.

  • Parallel Restricted nodes can only appear above Gather nodes, meaning they cannot be part of the parallel execution plan.

  • Parallel Unsafe nodes are not allowed in a parallel execution plan, and a query plan containing Parallel Unsafe nodes cannot be executed in parallel.

Commonly parallelizable nodes include Parallel Append, Partial Aggregate, Parallel Join, and Parallel Index/Bitmap/Sequential Scan. However, the latter three types of Plans are not used in Klustron for querying user data since user data is stored in storage nodes and does not involve these Plan nodes.

The query optimizer may consider converting a subtree of Plans to a parallel execution plan only when all Plan nodes within the subtree are marked as Parallel Safe. However, not all nodes in a parallel execution plan are split into multiple subtasks for execution. For example, the inner nodes of hash/nested loop/merge join are fully executed in each parallel worker, while only the outer nodes' query execution workload is divided into multiple subtasks for execution by multiple worker processes. Only the inner nodes of Parallel Hash Join are split among all workers for parallel execution and data sharing.

During both query optimization and query execution, parallel execution is subject to resource constraint variables, including max_worker_processes, max_parallel_workers_per_gather, and max_parallel_workers. These variables determine how many subtasks a query plan's parallelizable subtree will be split into by the query optimizer and how many worker processes will be involved in parallel execution.

图片

(Parallel Query Function Switch and Resource Constraint Variables)

The above figure shows the configuration variables related to parallel execution in Klustron's compute node Klustron-server. It includes the parallel function switch and the process number limitation for parallel execution. The enable_parallel_remotescan is specific to Klustron-server and the rest of the variables are inherited from PostgreSQL.

These capabilities are inherent to PostgreSQL-11.5, and in Klustron-server (Klustron's compute node), we fully adopt, leverage, and extend these functionalities to distributed scenarios, as detailed in the next section.

Parallel Execution of Remote Plan

Remote Plan is a new query plan node introduced in Klustron, used to retrieve user data from storage nodes. When the enable_parallel_remotescan switch is turned on, the optimizer in Klustron-server can not only assign Remote Plans scanning different table partitions to multiple partial plans for parallel execution by multiple worker processes but also split Remote Plan nodes scanning the same or unpartitioned single table into multiple Remote Plan subtasks based on the range of rows needed to be scanned. This way, even if Remote Plan needs to scan a single table, it can still be executed in parallel.

图片

(Detailed version of parallel query example 1)

From the diagram:

  1. Each worker performs RemotePlan node scans on one table partition of table t2, and the leader process also executes a RemotePlan.

  2. The three table partitions of T2 are located on the same shard 3. The leader process and the three worker processes are responsible for fetching data from each table partition. They connect to the shard master node to fetch data, and these connections can use the same snapshot, ensuring transaction consistency of the returned data.

  3. The left node of the Hash join is executed in parallel (Parallel Append), while the right node is executed by each worker separately. Therefore, worker processes do not share data, leading to some resource waste.

  4. The Partial Aggregate node aggregates the rows returned by the Hash Join node for each worker. Each worker process executes a subtree rooted at Partial Aggregate. The data set scanned by each worker's subtree is determined by the Remote Plan of each partial plan.

  5. The Gather node collects the results returned by each worker process, and the leader process executes the Gather node and the part above it. The Gather node needs to pass the result set to the Finalize Aggregate to form the final Aggregate result using the partial aggregate results from each worker.

Additionally, in the parallel query processing scenario, Klustron-server can still leverage read-write separation technology to utilize the standby machines of shards for executing read-only queries. This is particularly suitable for OLAP scenarios, as it can avoid overloading the primary node.

During the parallel execution of Remote Plan, worker processes individually connect to the storage nodes to execute query tasks. To obtain consistent query results, these worker processes must use the same snapshot in their respective connections. Therefore, we have introduced the capability of snapshot sharing in Klustron-storage to support the execution of parallel distributed query plans in Klustron-server. This aspect will be explained in the following section.

Parallel Query Processing between Klustron Compute Nodes and Storage Nodes

In the previous section, we discussed parallel query processing within the compute nodes, which was limited to executing read-only query subtrees. Now, let's explore how Klustron achieves parallel query processing between the compute nodes and storage nodes. When insert/update/delete statements are dispatched to multiple shards, Klustron leverages asynchronous execution. As a result, each shard can receive and execute its respective statements nearly simultaneously, leading to the parallel execution of insert/update/delete operations.

Supported Features in Klustron Storage Nodes

The MySQL community edition's parallel query capabilities are rather limited and not utilized in Klustron. Instead, we have developed a series of technologies in Klustron-storage to enhance the performance of Klustron queries. Some of these technologies are employed in Klustron's parallel query processing, while others can be applied not only to parallel queries but also to serial query processing.

Transaction Snapshot Sharing Technology

Klustron introduces the innovative Transaction Snapshot Sharing Technology, which enables multiple worker threads in the storage nodes to execute multiple Remote Plans from the same query plan in the compute nodes. This empowers both the compute nodes and storage nodes to parallelly execute SELECT queries. This technology serves as the foundation for the compute nodes to execute Remote Plans in parallel. Without it, the data seen by multiple worker processes might not be consistent with the data observed in the main process (backend process, Leader process) during serial execution, leading to inaccurate query results.

When the compute node initiates a transaction, it instructs the storage node to create a snapshot in its main connection (backend process, Leader process). Then, in the connections of parallel worker processes, special commands are executed to replicate and utilize the snapshot from the main connection. This ensures that the data observed in the worker processes matches the data in the main connection, guaranteeing data consistency for the query results.

Prepared Statement

When a client connection executes a prepared statement, the compute node also sends several prepared statements to the target storage node. After the client binds parameters to its prepared statement, the compute node sends the corresponding parameter binding command to the storage node. This prevents repetitive query parsing for frequently executed query statements.

This advantage of using prepared statements becomes particularly evident in scenarios where incremental execution is utilized, as discussed in the next section.

Incremental On-Demand Execution Technology

In the community edition of MySQL, when execute() is called for a prepared statement, the entire query is executed, and all query results are stored in a temporary table. Subsequent fetch() operations retrieve result rows from this temporary table continuously. However, this approach doesn't fully achieve the purpose of fetch() - incremental on-demand execution to avoid unnecessary consumption of computing resources. Moreover, executing the entire query and storing the entire result set in a temporary table can lead to significant IO bandwidth consumption and temporary storage space usage when dealing with large result sets.

Many common SQL queries, such as those with limit clauses, exists/any/some/in/not in subqueries that cannot be transformed into semi-joins, and queries with conditions limiting the number of result rows from subqueries, often encounter situations where they need to fetch() a part of the query result and then stop execution. The way prepared statements are executed in the MySQL community edition doesn't perform well for queries with these characteristics.

In Klustron-storage, we have designed and implemented an incremental on-demand execution technique - the client (in the Klustron scenario, it is Klustron-server) fetches() as many data rows as needed. With this technology, Klustron-storage achieves much better performance than MySQL and completely avoids the related performance and resource overhead issues seen in the MySQL community edition.

For example, consider a partitioned table t1 with 100 million rows of data, distributed among 16 table shards on 4 shards, and the partition key is id. Execute the following query:

select*from t1 whereage
between 18 and 36 limit 1000;

With prepared statements in the compute node, the query sent to the storage node only needs to retrieve a total of 1000 rows from these 16 table shards, and the remaining data rows do not contribute to the query result. Without the aforementioned technique in Klustron-storage, those query statements would be fully executed, leading to a significant increase in execution time. Moreover, all rows in the t1 table that meet the filtering criteria would be returned to the compute node, and the compute node might receive millions of rows, but it would only return the first 1000 rows to the client and discard the rest. This results in a wastage of a considerable amount of computing resources (CPU time slices, network bandwidth, memory bandwidth).

Klustron OLAP Query Processing Performance and Comparison

Next, we will conduct experiments to assess the performance of several typical queries under both parallel and serial execution scenarios, followed by an analysis of the underlying reasons for the performance differences. For the tests, we have set the basic configuration as follows: max_parallel_workers=8; max_parallel_workers_per_gather=4

This means that each query subtree can involve up to 4 parallel worker processes, including the leader process. Before proceeding with the experiments, we have prepared the test data, which involves defining two data tables and inserting the required test data.

图片)

(Data Preparation - Defining Tables and Inserting Data)

  1. Two-table join with row filtering (referred to as Query 1, SQL statement, query plans, and execution times are detailed in the following images).

Query Result: The parallel execution time is approximately one-third of the serial execution time (112ms VS 312ms).

This performance difference can be attributed to the fact that during parallel execution, three out of the four table shards are located on the same shard 3, leading to an overloaded shard during parallel execution. As a result, the performance improvement achieved by parallel execution falls short of the expected four-fold increase. Additionally, the overhead associated with building the internal table hash table is not diluted due to parallel execution.

图片

(Query 1 - Parallel Execution Plan and Time)

图片

(Query 1 - Serial Execution Plan and Time)

  1. Two-table join with subsequent aggregate queries, one with grouping (Query 2) and the other without grouping (Query 3).

Query results are shown in the table below, and detailed SQL statements, query plans, and execution times are presented in the following figure.

QueryParallel Execution Time (ms)Serial Execution Time (ms)
Query 292250
Query 372247

Query 2 takes longer than Query 3 by 20 milliseconds as each worker needs to perform a hash-based grouping operation and sort all the groups generated on the local node. Both Query 2 and Query 3 involve a leader thread performing the Finalize Aggregate operation separately, and each worker executes hash join on internal nodes independently to build a hash table based on the returned data rows. As a result, the overhead of these operations cannot be diluted across the 4 worker processes, making the execution time slightly more than one-fourth of the serial execution time.

图片

(Query 3 - Parallel Execution Plan and Time)

图片

(Query 2 - Parallel Execution Plan and Time)

图片

(Query 2, 3 - Serial Execution Plan and Time)

Conclusion

Klustron's parallel query processing technology maximizes the potential of PostgreSQL's single-node parallel query processing capabilities and extends them to distributed query processing scenarios. Additionally, we have bolstered and expanded the query processing capabilities of the community edition of MySQL in Klustron-storage, providing a strong foundation for Klustron's distributed parallel query processing. The seamless collaboration between Klustron-storage and Klustron-server has resulted in impressive distributed parallel query processing performance.

Looking ahead, we will continue to enhance and strengthen Klustron's parallel query processing capabilities, fully utilizing the computational power of distributed database clusters to achieve even better query processing performance.