Skip to main content

Principle and Implementation of Klustron Horizontal Scaling (ScaleOut)

KlustronAbout 7 min

Principle and Implementation of Klustron Horizontal Scaling (ScaleOut)

1. The Basic Core Capability of Distributed Databases - Scale Out

The evolution of database system architecture reflects the fact that data processing scale is continuously increasing in the information age.

Today, distributed database products have become a widely used technology for storage services in various industries due to their scalability and cost-effectiveness advantages over single-node databases.

Before we introduce the scaling function of Klustron, let's first review the common modes of database management system scaling.

The two common modes of database scaling are vertical scaling (also known as scaling up) and horizontal scaling (also known as scaling out).

- Vertically Scale Up

Vertical scaling refers to expanding physical resources such as storage capacity, computing power, and network bandwidth.

This scaling method is triggered by the bottleneck of physical resources on a single machine, and expanding physical resources of a single machine is the solution to meet the database system's demand for physical resources. However, the disadvantage of this scaling method is also obvious: there is always a limit to physical resources on a single machine, and they are generally very expensive.

- Horizontally Scale Out

The basic idea of horizontal scaling is to distribute data to different physical devices according to certain rules, and the entire system still presents a single logical database service to the outside world.

The system expands by increasing the number of physical devices to improve the overall external service capabilities of the database. This scaling method can theoretically achieve unlimited expansion.

If the expanded physical nodes include storage capacity, it is called a share-nothing architecture, otherwise, it is called a share-storage architecture.

Oracle RAC is a typical share-storage architecture where all computing nodes share a set of storage services, while Klustron is a typical share-nothing architecture.

From the perspective of read-write conflicts caused by physical device expansion, the data is stored in multiple storage clusters in the share-nothing architecture, and there is no intersection between the shards, so there is no problem of multi-point write conflicts. Therefore, the linear expansion ability is relatively advantageous compared to share-storage. However, core functions such as distributed transaction processing, distributed query processing, and automatic, application-unaware horizontal elastic scaling are essential in this architecture. Only when all of these functions are in place can it be considered a truly distributed database system.

This is also why the middleware of sharding databases and application-layer sharding are no longer suitable for current technology requirements - these technology solutions do not have the above functions and will bring heavy burdens and workloads to application developers and DBAs. They have to implement distributed transaction processing and distributed query processing functions of the distributed database system case by case in application code, and DBAs need to manually stop the service to complete the scaling, which brings huge risks to the stability and reliability of the application system, affecting customer business development and end-user experience.

When selecting the architecture for Klustron, we fully consider the above-mentioned problems. Based on our rich design and implementation experience in core distributed database functions such as distributed transaction processing, distributed query processing, and automatic horizontal elastic scaling, as well as a deep understanding of related user needs and the ultimate pursuit of linear scalability, we have implemented a business-unaware horizontal elastic scaling mechanism based on the share-nothing architecture in Klustron to better meet the requirements of rapid business growth for database systems.

2. Introduction to ScaleOut Functionality of Klustron

2.1 Horizontal Scaling of Storage Layer

Klustron's storage service is logically composed of multiple independent MySQL clusters, each of which is called a shard.

Each shard is an independent disaster recovery storage unit in Klustron's storage layer, and data is mutually independent between different shards. Therefore, for the horizontal scaling of the storage layer, we only need to add independent disaster recovery storage units, that is, add new shards to the cluster.

After the new shard is added to the cluster, another important issue that needs to be addressed is how to migrate data from the original shard to the new shard, so as to improve the overall service capacity of the storage layer for data read and write requests and storage load.

Klustron has implemented a lock-free data shuffle (migration) service based on the above business requirements, which ensures the continuity of storage services in a reliable manner, and has no impact on the application system. This migration service can be used for both scaling up and scaling down. For example, in order to increase computing and storage capacity several times temporarily for Double Eleven promotion, and to release relevant computing and storage resources and return them to the public cloud platform after the promotion ends.

At the same time, this function can also be used for cold and hot data separation. In a Klustron cluster, several special storage shards (cold shards) can be prepared for cold data (low-value data, rarely used data, expired data, etc., such as bank customers' account statements three years ago, e-commerce system's user orders one year ago, etc.). These shards use low-cost computer hardware and low-cost, large-capacity storage hardware (such as SATA disks). When the data expires, it is migrated from the hot shard to the cold shard. The data in these cold shards can still be read and written, but the performance is lower, and the storage cost is greatly reduced.

- lock-free shuffle service

When selecting which data to migrate, Klustron will select the tables and shards to be moved according to certain rules (which will be introduced in subsequent articles) to identify the true hot data, so as to achieve a more balanced traffic sharing and more efficient distributed queries. At the same time, we have also designed a lock-free data shuffle process to ensure that a table can be read and written continuously throughout the entire shuffle process, achieving zero business impact.

The entire shuffle process includes several steps. Assuming that the current shuffle-set (the table to be migrated) is being moved from shard2 to a new shard.

**Step 1:**The shuffle-set dump and load operations need to be completed.

In the dump phase, a snapshot point is kept, which is the starting position for subsequent binlog replication. The entire dump process does not block business requests. After the dump is completed, the data files that were dumped are loaded into the new shard in parallel.

During the dump and load process, the cluster_mgr and node_mgr modules of Klustron's cluster management system work together to complete the data migration of tables in the shuffle-set.

As shown in the above figure, the entire process is divided into three sub-tasks and sent to the corresponding node_mgr:

- Dump_task

Cluster_mgr issues a dump task to the standby machine of shard2, and the corresponding node_mgr uses the mydumper tool to execute the dump task in parallel. After completing the task, it replies to the cluster_mgr that the task is completed.

- Transfer_task

After the first phase dump task is completed, the cluster_mgr issues the transfer_task command to the node_mgr on the shard-new host, and shard-new pulls the corresponding dump file from shard2. After the download is completed, the node_mgr on shard-new responds to the cluster_mgr that the task is completed.

- Load_task

After completing the transfer_task, the cluster_mgr begins to issue the load_task to shard_new. During the execution process of the task, the node_mgr uses the myloader tool to load data into shard-new in parallel, and after completion, it responds to the cluster_mgr that the task is successful.

**Step 2:**The data synchronization between the new shard and the source shard will be established to apply all the data changes of shuffle-set from the snapshot point on the source shard to the new shard.

To establish the data synchronization link, Klustron uses MySQL's native binlog-based replication mechanism. It creates a temporary synchronization channel that contains only the shuffle-set tables and uses the dump snapshot as the starting point for synchronization. The incremental logs are pulled and replayed in parallel on the shard-new until the entire synchronization delay is within the predetermined time range (default 3 seconds). After that, the table switching operation will begin.

The table switching operation renames the table on the source shard to cut off new requests from the business. If a computing node accesses the table, it will find that the table does not exist, and it will locate the new location of the table from its own metadata, which is updated by cluster_mgr in subsequent steps.

After the rename operation is completed on shard2, the shard-new will confirm that the rename operation has been replayed, and then the entire data synchronization channel will be cut off. Suppose table A is in the shuffle-set. At this point, shard-2 contains a table A-renamed, and shard-new also contains a completely identical table A-renamed.

**Step 3:**Notify all computing nodes to update the routing information of shuffle-set tables, and shuffle-set tables are officially served.

When the synchronization delay is within a reasonable small range, the table is renamed on the source shard to cut off new requests from the business. After the rename operation is completed, the entire data synchronization link is cut off. Suppose table A is in the shuffle-set. At this point, shard-2 contains a table A-renamed, and shard-new also contains a completely identical table A-renamed (the rename operation is also synchronized from the source shard to the target shard).

2.2 Horizontal Scaling of Computing Layer

Klustron's computing nodes are designed to be stateless, meaning that they don't persist any cluster-related data locally.

Therefore, in terms of design and implementation of compute capacity scaling, Klustron has a natural advantage - the ability to rapidly deploy computing node services to the cluster. After the computing node service is launched, it will automatically synchronize relevant information from the metadata cluster, including routing information and storage node-related information.

Because all metadata processing is done in-memory, the time window for a computing node to scale out from start to serving external requests is very short, enabling agile horizontal scaling of compute capacity.

2.3 Fault-Tolerance and Rollback Mechanisms

From the above scaling process, we can see that the entire process involves multiple sub-tasks and multiple physical devices, so robust fault-tolerant and rollback designs are necessary for ensuring system high availability.

Based on the above scaling process design, the handling of failures at different stages is as follows:

- Dump file failure

If the dump file fails, it can be retried (default 3 times, configurable).

- Transfer file failure

If the transfer of the data files dumped between physical devices fails, it can be retried (default 3 times, configurable).

- Load data failure

If the load operation fails, it can be retried (default 3 times, configurable).

- Table catch up failure

Table catch up refers to the process of establishing the data synchronization channel between the new shard and the source shard regarding the shuffle-set, replaying the incremental data change log. If this process fails, an error message needs to be returned for analysis. Then the process is terminated and the shuffle-set related data on the new instance is cleaned up.

- Table switching failure

After entering the table switching process, in principle, the table switching must succeed without any disasters such as network isolation or physical device power failure. Therefore, in the implementation mechanism, we forcibly terminate the locked session before executing the rename operation on the source shard to ensure that the rename operation will not fail due to lock competition.

If, within 3 seconds of completing the rename operation on the source shard, the rename replay on the shard-new is not completed (such as physical disasters), the process needs to be terminated, and the tables containing in the shuffle-set that have already been renamed on the source instance need to be quickly restored and the external service needs to be resumed.

3. Planning and Prospects

As Klustron continues to evolve and develop, the construction of its horizontal scalability capabilities will gradually become more refined and intelligent. Currently, related feature plans cover areas such as the selection of shuffle sets and optimization of shuffle processes for improved efficiency.

Specifically:

- More effective shuffle set construction algorithms

Shuffle set construction is aimed at better expanding hot shards, improving the hot data control service capabilities of the entire database service, and is also an important means for flexible and efficient execution of distributed queries. Therefore, in future iterations, this part of the capability will also be our key focus.

- Optimization of shuffle process

Fast and efficient shuffling helps to improve the overall performance of the database. In future versions, we will focus on the efficiency of shuffling, such as attempting to implement a pipeline-based data shuffling strategy, and so on.

-END-