Introduction to Klustron Distributed Task Execution Framework
Introduction to Klustron Distributed Task Execution Framework
1. Background and Objectives
As a distributed database product, Klustron has a native demand for multiple physical devices to work together. Many cluster-level operations, such as scaling, rollback, adding shards, data shuffling, etc., need to be completed on different physical devices.
To efficiently and reliably complete such distributed collaborative tasks, the Klustron development team has implemented an efficient distributed task execution framework based on the Cluster_mgr and Node_mgr cluster components. The core requirements it meets include, but are not limited to, cluster status maintenance, distributed transaction management, issuing and executing multi-node collaborative tasks, querying and displaying cluster status, etc. Its implementation goal is to abstract this type of multi-machine task well to achieve high concurrency, observable task status, and controllable task execution.
2. Interaction Model
The ClusterManager is the cluster control component of KUNLUN DBMS, responsible for the aforementioned collaborative work of multiple devices. It is an independent functional module that works with the metadata cluster and NodeManager, running statelessly on any physical device in the cluster.
The frontend-to-backend interaction model of the distributed task execution framework is described below using the process of a cluster expansion task as an example.

- The user submits a scale-up task to ClusterManager via the web console and immediately receives a response, including the ID of the current task and whether the task has been accepted.
- The frontend sends a task status query request containing the task ID to ClusterManager through polling, and ClusterManager queries the MetaCluster to obtain task information and responds to the frontend.
- NodeManager receives the task and begins to execute relevant operations, updating the task status in the MetaCluster during the process.
- The frontend detects the success or failure flag in the task status polling result, stops polling, and indicates the end of the current task.
ClusterManager provides HTTP services to the outside world in a RESTful style API, mainly for issuing and querying the status of cluster management tasks (creating and deleting clusters, creating and deleting nodes, scaling up, and rolling back).
ClusterManager provides HTTP services to the outside world and logically provides only two service functions: task issuance and task status query. Accordingly, only two HTTP request methods are processed: GET and POST.
GET JSON text in the HTTP body specifies the unique task ID to obtain the task status. It does not undertake other functions. The response includes task status information.
POST The JSON text in the HTTP body specifies the task-related type and parameters to issue the task. The response includes whether the task is accepted and the global task ID assigned to the task after being accepted. The frontend can use this ID to poll and obtain the task execution status.
All HTTP responses from ClusterManager are sent immediately without blocking. As for the URL part, ClusterManager only provides a unique identifier to the outside world: http://ip:port/HttpService/Emit, and other resource identifiers are not processed.
3. Model for Task Issuing and Execution
ClusterManager's task processing is a purely asynchronous framework for both frontend and backend interactions. This is implemented to ensure that ClusterManager can continuously provide cluster management services without being blocked by long-running sub-tasks.
ClusterManager's asynchronous characteristics are mainly reflected in the following aspects:
- Task reception and initialization are asynchronous When the frontend issues a POST task, ClusterManager only synchronously performs simple validity checks (with simple logic that will not cause long-term blocking) and then immediately responds to the frontend HTTP request. The specific initialization and dispatch of tasks will be handled asynchronously in the backend.
- The backend task dispatch and execution are handled asynchronously through coroutine
Although the single RPC call with NodeManager is still synchronous, the overall processing flow is asynchronous, so the entire HTTP service will not be blocked. The following figure illustrates the flow: 
4. Abstract Interface of Distributed Tasks
4.1 REQUEST Object - Corresponding to Business Request Processing
For each POST query, a REQUEST entity is generated in ClusterManager. This entity handles the execution of sub-tasks, persistent information, and other functions, defined as follows:
class ClusterRequest : public kunlun::ErrorCup {
public:
...
void SetUp();
// Derived class should implament it
// Invoked by SetUp()
virtual void SetUpImpl() = 0;
virtual void DealRequest() = 0;
// the response will be sent in TearDown()
void TearDown();
// Derived class should implament it
// Invoked by TearDown()
virtual void TearDownImpl() = 0;
// getter & setter
void set_status(RequestStatus);
RequestStatus get_status();
std::string get_request_unique_id();
void set_request_unique_id(std::string &);
RequestBody get_request_body();
ClusterRequestTypes get_request_type();
bool ParseBodyToJson(const std::string &);
// forbid copy
ClusterRequest(const ClusterRequest &) = delete;
ClusterRequest &operator=(const ClusterRequest &) = delete;
private:
...
};
The processing flow for a Request is fixed and consists of three steps: SetUp() / DealRequest() / TearDown(). Each node corresponds to the task preparation phase, task execution phase, and task completion phase. 
Different tasks need to instantiate the interface class and fill in the SetUpImpl/DealRequest/TearDownImpl methods. These customized logics will be called in the task execution process.
SetUp SetUp needs to be lightweight and runs in a serial single-threaded manner. The purpose of using serial synchronous execution for the SetUp phase is to facilitate the processing of exclusive tasks, such as the need to execute a task that can only exist and execute globally. In this case, the task logic can be lifted to the SetUp phase.
Deal This is the location for executing the main task logic (of course, if necessary, the logic here can be lifted to SetUp). It will be unfolded in a coroutine, but it will not asynchronously wait for the RPC response. Synchronous execution is still used here because the synchronous waiting does not affect the main logic. To simplify programming difficulties, synchronous execution is used. If there is a demand for multiple NodeManagers to execute instructions asynchronously at the same time, it can be encapsulated in REMOTETASK, and the upper-level call is still a synchronous interface.
TearDown This is for global finishing work, such as notifying computing nodes and updating metadata clusters. It will also be unfolded in a coroutine.
4.2 REMOTETASK Object - Remote Task
Each REQUEST may involve multiple remote tasks, such as dumping data, transferring data, loading data, establishing synchronization, verifying results, and disconnecting synchronization during expansion.
Each of these processes needs to be executed on different machines one by one, each corresponding to an RPC process. The REMOTETASK object corresponds one-to-one with the above remote RPC. Defined as follows:
class RemoteTask : public kunlun::ErrorCup {
public:
RemoteTask();
~RemoteTask();
private:
RemoteTask(const RemoteTask &) = delete;
RemoteTask &operator=(const RemoteTask &) = delete;
public:
bool InitRemoteTask(int timeout_sec) ;
bool AddNodeSubChannel(brpc::Channel *sub_channel);
...
// sync run in bthread
bool RunTaskImpl() ;
void RecordeSubChannleReturnValue(const brpc::Controller *cntl, int index) ;
bool AllSubChannelSuccess() ;
void InitPara(std::string &action, std::string ¶s, std::string ¶_tag);
private:
brpc::ParallelChannel remote_channel_;
...
std::string action_;
std::string action_paras_;
std::vector<std::string> sub_channel_return_info_vec_;
rapidjson::Document return_info_json_array_;
// for multi nodemanager action has different paras,
// use this tag to get the right paras
std::string action_paras_tag_;
};
Among them, RunTaskImpl() is a synchronous remote RPC client that executes in a coroutine. The synchronous model used here will not have any adverse effects and will not block external services.
4.2.1 Remote RPC for Single Device
Each device in ClusterManager is represented by a Channel object, which can be understood as an encapsulation of a remote HTTP server. Therefore, to prepare for a REMOTETASK for a single device remote RPC, simply select the appropriate Channel and execute AllSubChannelSuccess(), then initialize the task parameters. 
4.2.2 Remote RPC for Multiple Devices
In certain scenarios, it is necessary to execute commands on multiple machines simultaneously and wait for all devices to succeed, or an error will occur and terminate. To address this, the instantiation operation for REMOTETASK only requires adding the corresponding channel multiple times. If there are different parameters for executing tasks on different machines, tags can be used to distinguish them. 
4.3 MissionRequest Object - Complex Task Orchestration and Execution
A complex business request may involve multiple stages of tasks, and each task may require multiple physical devices to execute related commands through multi-node asynchronous RPC. Therefore, a function that can implement complex remote task orchestration is needed. MissionRequest inherits from ClusterRequest and combines the functionality of REMOTETASK to implement a unified task execution model and task orchestration model. 
class MissionRequest : public ClusterRequest {
typedef ClusterRequest super;
public:
MissionRequest(google::protobuf::RpcController *cntl_base,
const HttpRequest *request, HttpResponse *response,
google::protobuf::Closure *done)
: super(cntl_base, request, response, done) {
task_manager_ = nullptr;
}
virtual ~MissionRequest();
virtual void SetUpImpl() override final;
// user should add arrange remote task logic
virtual bool ArrangeRemoteTask() = 0;
// user shold add setup logic here
virtual bool SetUpMisson() = 0;
virtual void DealRequest() override final;
virtual void TearDownImpl() override final;
private:
TaskManager *task_manager_;
};
void MissionRequest::SetUpImpl() {
ArrangeRemoteTask();
SetUpMisson();
}
void MissionRequest::DealRequest() {
// do the request iterator vec
auto &task_vec = task_manager_->get_remote_task_vec();
auto iter = task_vec.begin();
for(;iter != task_vec.end();iter++){
bool ret = (*iter)->RunTaskImpl();
if (!ret){
setErr("%s",(*iter)->getErr());
return false;
}
}
return true;
}
void MissionRequest::TearDownImpl() {}
For example, in the expansion scenario, only a new instance of Expand needs to be implemented. This instance inherits from MissionRequest and implements SetUpMission() / DealRequest() / TearDownImpl() / ArrangeRemoteTask() to unify the entire expansion task process into the existing task framework.
The logic that ArrangeRemoteTask() needs to implement is to instantiate RemoteTask as needed and put it into the queue one by one. When actually executed, they will be executed one by one from the queue. 
5. Planning and Prospects
Currently, the cluster expansion function has been developed based on the task coordination framework described above. In future versions, all cluster management interfaces will be migrated to this framework.
At the same time, in order to better support complex task execution models to meet more cluster management needs, the distributed task scheduling framework is also continuously improving. For example, it supports remote sub-tasks with dependencies and can dynamically plan and manage sub-tasks using dynamic programming models. The Klustron team's goal is to simplify the implementation of complex business logic with more reasonable engineering practices, lay a good engineering foundation for providing more stable and reliable cluster management features, and enable Klustron to better serve customers and bring greater value to them.