Klustron(原KunlunBase) 分布式任务执行框架介绍
Klustron(原KunlunBase) 分布式任务执行框架介绍
一、背景及目标
Klustron 作为一款分布式数据库产品,原生具有多个物理设备协同工作的诉求。很多集群层面的操作如扩容,回档,增加分片,数据 shaffle 等都需要在不同的物理设备上完成。
为了使这种分布式协同任务能够高效可靠的完成,Klustron 开发团队在 Cluster_mgr 和 Node_mgr 这两个集群组件的基础上,实现了一个高效的分布式任务执行框架,其满足的核心需求包括但不限于集群状态维护,分布式事务管理,多节点协同任务的下发与执行,集群状态的查询和展示等;其实现目标是能够将此类型的多机任务进行良好的抽象,以达到并发程度高,任务状态可观测,任务执行可控制的目的。
二、交互模型
ClusterManager 是 KUNLUN DBMS 的集群管控组件,承担上述多设备协同工作的任务,是独立的功能模块,与元数据集群、NodeManager 协同工作,无状态的运行在集群内任意物理设备上。
下面以集群扩容任务的流程来描述分布式任务执行框架的前后端交互模型。

- 用户(User)通过 Web 控制台下发扩容任务到 ClusterManagger 后会立刻收到应答,包括当前下发的任务的 ID,是否该任务被接受等信息。
- 前台通过轮训的方式,向 ClusterManager 发送包含任务 ID 的任务状态查询请求,ClusterManagger 通过查询元数据集群(MetaCluster)获取任务信息,并应答给前台。
- NodeManager 接收到任务后,开始执行相关操作,并在过程中向元数据集群更新任务状态。
- 前台在任务状态轮训的结果中发现成功或者失败标志,即结束轮训,标识当前任务结束。
ClusterManager以 RESTful 风格 API 向外提供http服务,目前主要以集群管控任务(新建删除集群,新建删除节点,扩容,回档)的下发及相关状态查询功能为主。
ClusterManager 对外提供 HTTP 服务,逻辑上只提供两种服务功能: 任务下发和任务状态查询。相应的,只处理两种 HTTP 请求方法:GET 和 POST。
GET HTTP body 中通过 json 文本指定唯一的任务 id ,来获取任务状态。不承担其他功能。应答包括任务状态信息。
POST HTTP body 中通过 json 文本指定任务相关类型及参数,来下发任务。应答包括了是否任务被接收,和 被接受后任务被赋予的全局的任务 ID。前端可以用这个 ID 去轮训的获取任务执行状态。
所有 ClusterManager 的HTTP 应答都会立刻发出,不会有阻塞。URL 部分,ClusterManager 对外只提供唯一的标识 http://ip:port/HttpService/Emit ,其他资源标识不做处理。
三、任务的下发与执行模型
ClusterManager 任务处理是面向前后端的纯异步框架。这样实现的目的是为了能够保证 ClusterManager 能够持续的对外提供集群管控服务,不会因为子任务的长时间执行而导致server阻塞。
ClusterManager 的异步化特征主要体现在以下几个方面:
- 任务的接收和任务初始化是异步的 前台下发的 POST 任务,ClusterManager 只会同步做简单的合法性校验逻辑(逻辑简单,不会有长时间的阻塞)然后就会立刻给前台 HTTP 应答。具体任务的内部初始化和下发,会被后台异步化的处理。
- 后台任务的下发执行通过协程异步处理
虽然单个与 NodeManager 的 rpc 仍然是同步等待调用,但是整体的处理流程是异步的,所以整个 HTTP服务不会阻塞。如下图所示: 
四、分布式任务的抽象接口
4.1 REQUEST 对象-对应业务请求处理过程
每一个 POST query 都会在 ClusterManager 中生成一个 REQUEST 实体。该实体会承担执行子任务,持久化信息等功能,定义如下:
class ClusterRequest : public kunlun::ErrorCup {
public:
...
void SetUp();
// Derived class should implament it
// Invoked by SetUp()
virtual void SetUpImpl() = 0;
virtual void DealRequest() = 0;
// the response will be sent in TearDown()
void TearDown();
// Derived class should implament it
// Invoked by TearDown()
virtual void TearDownImpl() = 0;
// getter & setter
void set_status(RequestStatus);
RequestStatus get_status();
std::string get_request_unique_id();
void set_request_unique_id(std::string &);
RequestBody get_request_body();
ClusterRequestTypes get_request_type();
bool ParseBodyToJson(const std::string &);
// forbid copy
ClusterRequest(const ClusterRequest &) = delete;
ClusterRequest &operator=(const ClusterRequest &) = delete;
private:
...
};
对一个 Request 来说,处理这个 request 的流程是固定的,包含三个步骤:SetUp() / DealRequest() /TearDown()。每个节点对应任务准备阶段,执行任务,任务收尾阶段。 
不同的任务,需要实例化该接口类,并且填充 SetUpImpl/DealRequest/TearDownImpl 方法,这些自定的逻辑会在任务执行流程中被调用。
SetUp SetUp 需要轻量,这里是串行单线程执行的。 这里针对 SetUp 阶段的执行,采用串行同步执行的目的,是为了方便处理如,排他性任务的需求,如需要执行一个全局只能唯一存在并执行的任务,就可以将任务逻辑上提到 SetUp 阶段。
Deal 这里是执行主要任务逻辑的位置(当然需要的情况下,这里的逻辑可以上提到 SetUp),这里会在协程中展开,但是这里不会异步的等 rpc应答,仍然采用同步的方式。因为这里的同步等不会影响主逻辑,为了简化编程难度,采用同步即可。如果有多个 NodeManager 同时异步执行指令的需求,可以再 REMOTETASK 中封装,这里上层调用,仍然是同步的接口。
TearDown 这里是做全局的收尾工作,比如通知计算节点,更新元数据集群等。这里同样会在协程中展开。
4.2 REMOTETASK 对象-远程任务
每一个 REQUEST 可能涉及到多个远程任务,例如扩容就包含 DUMP 数据、传输数据、LOAD 数据、建立同步、校验结果、断开同步。
这其中的每一个流程都需要在不同的机器上,一个接一个的执行,每一个都对应一个 RPC 过程。REMOTETASK 对象与上述的远程 rpc 一一对应。定义如下:
class RemoteTask : public kunlun::ErrorCup {
public:
RemoteTask();
~RemoteTask();
private:
RemoteTask(const RemoteTask &) = delete;
RemoteTask &operator=(const RemoteTask &) = delete;
public:
bool InitRemoteTask(int timeout_sec) ;
bool AddNodeSubChannel(brpc::Channel *sub_channel);
...
// sync run in bthread
bool RunTaskImpl() ;
void RecordeSubChannleReturnValue(const brpc::Controller *cntl, int index) ;
bool AllSubChannelSuccess() ;
void InitPara(std::string &action, std::string ¶s, std::string ¶_tag);
private:
brpc::ParallelChannel remote_channel_;
...
std::string action_;
std::string action_paras_;
std::vector<std::string> sub_channel_return_info_vec_;
rapidjson::Document return_info_json_array_;
// for multi nodemanager action has different paras,
// use this tag to get the right paras
std::string action_paras_tag_;
};
其中 RunTaskImpl() 是一个同步等的远程 rpc 客户端,在协程中执行。这里用同步模型不会有不良影响,本身不会阻塞对外服务。
4.2.1 单设备远程 RPC
每一个设备在 ClusterManager 中都有一个 Channel 代表,该对象可以理解为一个远程 HTTPserver 的封装,因此实现一个单设备远程 RPC 的 REMOTETASK 需要的准备工作,只需要选择符合要求的 Channel 并执行 AllSubChannelSuccess(),然后初始化任务参数即可。 
4.2.2 多设备远程 RPC
有些场景需要在多个机器上同时执行命令,并等待所有的设备成功或者有失败发生就报错终止。这里对应到 RemoteTask 实例化的操作上,只需要多次添加对应的 channel 即可~。如果有不同机器执行任务的参数不同的情况,可以用 tag 加以区分。 
4.3 MissionRequest 对象-复杂任务编排与执行
一个复杂的业务请求,可能涉及到多个分阶段的任务;每个任务,也可能需要涉及到多个物理设备同时执行相关指令的多节点异步 RPC。因此需要一个能够实现复杂远程任务编排的功能。 MissionRequest 继承自 ClusterRequest , 并且组合了 REMOTETASK 的功能,实现统一的任务执行模型和任务编排模型。 
class MissionRequest : public ClusterRequest {
typedef ClusterRequest super;
public:
MissionRequest(google::protobuf::RpcController *cntl_base,
const HttpRequest *request, HttpResponse *response,
google::protobuf::Closure *done)
: super(cntl_base, request, response, done) {
task_manager_ = nullptr;
}
virtual ~MissionRequest();
virtual void SetUpImpl() override final;
// user should add arrange remote task logic
virtual bool ArrangeRemoteTask() = 0;
// user shold add setup logic here
virtual bool SetUpMisson() = 0;
virtual void DealRequest() override final;
virtual void TearDownImpl() override final;
private:
TaskManager *task_manager_;
};
void MissionRequest::SetUpImpl() {
ArrangeRemoteTask();
SetUpMisson();
}
void MissionRequest::DealRequest() {
// do the request iterator vec
auto &task_vec = task_manager_->get_remote_task_vec();
auto iter = task_vec.begin();
for(;iter != task_vec.end();iter++){
bool ret = (*iter)->RunTaskImpl();
if (!ret){
setErr("%s",(*iter)->getErr());
return false;
}
}
return true;
}
void MissionRequest::TearDownImpl() {}
例如在扩容场景中,只需要新实现一个 Expand 实例,该实例继承自 MissionRequest,然后对SetUpMission()/DealRequest()/TearDownImpl()/ArrangeRemoteTask()进行实现,即可将整个扩容任务流程统一到现有的任务框架中。
其中 ArrangeRemoteTask() 需要实现的逻辑就是,按需逐个实例化 RemoteTask 然后放入队列中即可。实际执行的时候,会逐个从队列中取出执行。 
五、规划与展望
目前,集群扩容功能已经基于上述任务协调框架开发完成,后续版本特性上,所有的集群管理接口都会向该框架进行迁移。
同时为了更好地支持复杂的任务执行模型以支撑更多的集群管理需求,分布式任务调度框架也在不断完善,例如支持有依赖关系的远程子任务、支持以动态规划的方式自动的编排管理子任务等模型等,昆仑团队的目标就是要以更合理的工程实践抽象来简化复杂业务逻辑实现的难度,为提供更加稳定可靠的集群管理特性打下良好的工程基础,使 Klustron 能够更好地服务客户,为客户带来更大的价值。