klustron CDC 使用手册2
klustron CDC 使用手册2
01 概述
Klustron CDC(change data capture) 用于将Klustron分布式数据库集群中的各个存储集群(shard)的数据更新实时导出为事件流,每个shard一条事件流,供外部组件消费。例如,外部组件可以做数据流式导入,把Klustron的数据更新导入到第三方数据库系统,或者导出到flink等事件流分析系统;Klustron CDC还支持从第三方开源mysql中实时导出数据更新事件流,这样就可以流式导入数据到Klustron集群中。
Klustron CDC 使用MySQL binlog dump协议连接到 MySQL或者KunlunBase的存储节点对数据更新进行实时捕获和转换,然后输出为多种格式,包括JSON对象和SQL语句。对MySQL或者KunlunBase的存储节点(Klustron-storage) 实例来说,Klustron CDC组件就像一个备机节点。目前Klustron cdc 支持两种模式,一种是从Klustron集群导出数据,一种是支持开源mysql实例导出数据。
从Klustron-1.3版本开始,Klustron-cdc支持导出到Kafka消息队列。 从Klustron-1.3版本开始,Klustron-cdc支持并行导出SQL语句流到其他数据库系统以便数倍提升数据流导出速度。Klustron-CDC启动多个工作线程,每个工作线程使用独立连接到目标数据库,然后把目标数据表分为若干组分配给每个工作线程,实现并行导出。这要求每个表都要有主键或者唯一索引,以便导出过程中如果因为各种软硬件故障导致流程中断,那么Klustron-CDC可以正确地继续导出。
02 功能用法
阅读完本文后,可以参考CDC 用法示例来完成实际的数据流式导出配置操作。
2.1 Klustron集群导出
Klustron CDC根据dump 任务参数,连接到Klustron元数据集群,获取需要dump 数据的Klustron 集群中的shard 参数。如果有多个shard,CDC会自动为每个shard建立一条dump连接。Klustron CDC会选择shard节点中延迟最小的备节点dump数据。如果dump过程中shard发生主备切换,即当前dump节点为主节点或者dump节点挂掉等情况,CDC会自动断开当前dump连接,重新选择shard中其他延迟最小的备节点进行dump。
2.2 开源mysql导出
Klustron CDC根据dump任务参数,连接到需要dump的mysql节点。目前针对开源mysql,CDC会实时检测dump连接情况,如果dump过程中连接断开,例如网络问题,源mysql中kill连接等情况,CDC自动重新连接。
2.3 事件输出和转换插件
Klustron CDC采用插件方式将捕获的binlog 数据做转换和输出。Klustron CDC提供开发插件的API接口,用户可根据Klustron CDC 提供的API接口开发插件,然后挂载到Klustron CDC,来处理CDC 生成的事件。目前Klustron CDC发布时自带两个插件event_file和event_sql。
Event_file的功能是:直接将CDC输出json内容序列化存储到指定文件。
Event_sql的功能是:将CDC输出json内容转换成sql语句,直接发送给Klustron集群或者开源mysql
Klustron CDC将捕获数据同步目标存储时,能够保证数据不丢失。CDC会根据dump数据gtid信息,实时备份同步点。如果dump过程中CDC模块因为各种软硬件故障而退出,那么重新启动后能够自动根据上次同步点继续同步。CDC模块支持集群部署,实现高可用。
如果需要实现其他转换插件,则需要按照下述插件接口实现转换插件,然后挂载到Klustron CDC。
2.4 断点续传的数据一致性
Klustron CDC 是一个raft集群,其主节点负责事件流处理。如果Klustron CDC 的主节点异常退出,那么Klustron CDC集群会自动选出新的主节点继续工作。新的主节点从之前主节点退出之前最后保存的位置开始执行,位置保存时间间隔可以设置,默认是5秒。因此,目的端的 数据表必须有主键 ,才能确保不会重复执行SQL语句,否则新的CDC主节点会把上一个主节点在退出之前最后若干秒之内的所有操作会再次执行一遍,那样就会导致相关数据行被重复地插入、删除或者更新,导致目的端数据与源数据库中的数据不再一致,那样还可能会导致后续数据同步失败和无法继续。
2.5 事件输出接口说明
2.5.1 Klustron CDC输出的binlog 事件的 json格式
Klustron CDC把每个binlog事件输出为具有以下属性的json对象。
字段名 | 说明 |
---|---|
gtid | 当前事件gtid |
database | 数据库名 |
table | 表名 |
isDdl | 是否为ddl |
sql | Ddl 执行sql语句 |
event_type | 事件名称 |
data | 如果为insert,为插入每列对于数据 如果为delete,为删除数据, 如果为update,为更新后行 |
old | 如果为insert,空 如果为delete,空, 如果为update,更新前数据 |
2.5.2 Klustron CDC 支持输出的binlog 事件类型
字段名 | 说明 |
---|---|
CREATE_DB | 创建库 |
DROP_DB | 删除库 |
CREATE_TABLE | 创建表 |
DROP_TABLE | 删除表,该语句支持多个表同时进行,kunlun_cdc这个地方将多个表拆成多个drop table 记录 |
CREATE_INDEX | 添加索引 |
DROP_INDEX | 删除索引 |
ALTER_TABLE | 表添加,删除和更新字段等 |
RENAME_TABLE | 表重命名,该语句支持多个表同时进行,kunlun_cdc这个地方将多个表拆成多个rename table 记录 |
INSERT | 插入数据 |
DELETE | 删除数据 |
UPDATE | 更新数据 |
对于DDL语句,事件的json字段isDdl=1,sql记录当前ddl语句,例如:
{"event_type":"CREATE_TABLE","db_name":"test","sql":"create table t (a int primary key, b int)","isDdl":"1","table_name":"t","data":"","old":"","gtid":"77cf0403-fe85-11ed-87ad-fc3497a73395:5620"}
对于DML语句,json字段isDdl=0, 在data和old字段中记录具体内容。例如:
{"event_type":"INSERT","db_name":"test","table_name":"t","sql":"","isDdl":"0","gtid":"77cf0403-fe85-11ed-87ad-fc3497a73395:5621","data":[{"a":"1","b":"1"}], "old":""}
2.6 自定义插件开发以及挂载方法
1、需要在 Klustron 官网下载 Klustron CDC软件,在软件包的 include子目录下找到开发插件依赖的头文件dispatch_event.h
2、继承CDispatchEvent类,实现init/execute/close三个虚函数接口,在dispatch_event.h中有详细说明各个函数使用方法
3、讲代码在Linux操作系统中编译成 so 文件,放到kunlun_cdc安装包的plugin目录下。
4、在kunlun_cdc安装包的conf目录下kunlun_cdc.cnf中的plugin_so标签下添加新开发的插件名。
例如开发插件so名为event_test.so,那么在kunlun_cdc.cnf文件plugin_so标签下添加内容如下:
plugin_so = event_file,event_sql,event_test
特别注意:插件名之间用逗号隔开
5、修改完配置文件,需要重启Klustron CDC集群所有CDC进程才可以生效
由于插件用户自己开发,那插件需要输入参数,则在添加dump任务时output_plugins标签下输入该插件需要参数。
03 部署架构图
04 配置使用
4.1 通过API方式配置
4.1.1 添加dump数据任务(异步接口)
4.1.1.1 从Klustron集群dump数据
1)指定从具体位置开始dump,需要指定该cluster下各个shard开始dump具体位置,必须指定binlog_file,binlog_pos以及gtid_set。
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:28001,172.0.0.2:28001,172.0.0.3:28001",
"meta_user":"xxx",
"meta_passwd":"xxxx",
"dump_db_type":"kunlunbase",
"cluster_name":"cluster_xxx_xxx",
"dump_tables":"postgres_$$_public.t1,postgres_$$_public.t2",
"shard_params":[
{
"shard_id":"1",
"dump_hostaddr":"127.0.0.1",
"dump_port":"28801",
"binlog_file":"xxx",
"binlog_pos":"899",
"gtid_set":"xxxx"
},{
"shard_id":"2",
"dump_hostaddr":"127.0.0.2",
"dump_port":"28802",
"binlog_file":"xxx",
"binlog_pos":"899",
"gtid_set":"xxxx"
}
],
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xxx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.5\",\"port\":\"24002\",\"user\":\"xxxx\",\"password\":\"xxx\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
}' -X POST http://172.0.0.1:18002/kunlun_cdc
2)从当前添加dump任务时间点开始dump
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:28001,172.0.0.2:28001,127.0.0.3:28001",
"meta_user":"xxx",
"meta_passwd":"xxx",
"dump_db_type":"kunlunbase",
"cluster_name":"cluster_xxx_xx",
"dump_tables":"postgres_$$_public.t1,postgres_$$_public.t2",
"output_plugins":[{
"plugin_name":"event_file",
"plugin_param":"/home/barney/kunlun_cdc/temp/event.log",
"udf_name":"test1"
},{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.6\",\"port\":\"24002\",\"user\":\"xxx\",\"password\":\"xxx\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}]
}
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.1.2 从开源mysql集群dump数据,必须指定dump具体位置,即设置shard_params参数,如果不指定则添加任务失败。
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"127.0.0.1:28001", --- dump mysql的ip:port
"meta_user":"xxx", --- 连接mysql的账户
"meta_passwd":"xxx", --- 连接mysql的密码
"cluster_name":"mysql",
"dump_db_type":"mysql",
"dump_tables":"test.t1,test.t2",
"shard_params":[{
"binlog_file":"xxx",
"binlog_pos":"899",
"gtid_set":"xxxx"
}],
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.2\",\"port\":\"24002\",\"user\":\"abc\",\"password\":\"abc\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
} ' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.1.3 从开源mariadb集群dump数据,必须指定dump具体位置,即设置shard_params参数,如果不指定则添加任务失败。
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"127.0.0.1:28001", --- dump mariadb的ip:port
"meta_user":"xxx", --- 连接mariadb的账户
"meta_passwd":"xxx", --- 连接mariadb的密码
"cluster_name":"mysql",
"dump_db_type":"mariadb",
"dump_tables":"test.t1,test.t2",
"shard_params":[{
"binlog_file":"xxx",
"binlog_pos":"899",
"gtid_set":"xxxx"
}],
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.2\",\"port\":\"24002\",\"user\":\"abc\",\"password\":\"abc\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
} ' -X POST http://172.0.0.1:18002/kunlun_cdc
特别说明:add_dump_table时,如果不传dump_db_type,默认值为kunlunbase
4.1.1.4 从redis集群dump数据,必须指定dump具体位置,即设置shard_params参数,如果不指定则添加任务失败。
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"127.0.0.1:28001", --- dump redis的ip:port
"meta_user":"xxx", --- 连接redis的账户,可选
"meta_passwd":"xxx", --- 连接redis的密码,可选
"cluster_name":"mysql",
"dump_db_type":"redis",
"dump_tables":"test.t1,test.t2",
"is_tls":"0", ---- 是否加密连接
"shard_params":[{
"binlog_file":"xxx",
"binlog_pos":"899",
"gtid_set":"xxxx"
}],
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"redis_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.2\",\"port\":\"24002\",\"user\":\"abc\",\"password\":\"abc\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
} ' -X POST http://172.0.0.1:18002/kunlun_cdc
特别说明:meta_user,meta_passwd配置是否使用账户/密码连接redis,如果dump的redis没有配置,则为空即可。is_tls为是否加密连接,默认为0即非加密连接。
shard_params标签下binlog_file为dump的redis id,binlog_pos为dump的redis位置。如果binlog_file/binlog_pos为空,则全量dump redis数据后再增量dump
如果redis数据dump到关系型数据库中,使用redis_sql插件。
4.1.1.5 从kunlunbase/mysql/mariadb dump数据时,支持配置是否先全量后增量方式
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:28001,172.0.0.2:28001,172.0.0.3:28001",
"meta_user":"xxx",
"meta_passwd":"xxxx",
"dump_db_type":"kunlunbase",
"cluster_name":"cluster_xxx_xxx",
"dump_tables":"postgres_$$_public.t1,postgres_$$_public.t2",
"need_alldump":"1",
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xxx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.5\",\"port\":\"24002\",\"user\":\"xxxx\",\"password\":\"xxx\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
}' -X POST http://172.0.0.1:18002/kunlun_cdc
特别说明:add_dump_table时,如果不传need_allldump,默认值为0则不做全量导出。
4.1.1.6 支持用户手动全量数据同步,然后CDC发起增量同步任务,并传入当前dump任务中已经存在的表结构
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:28001,172.0.0.2:28001,172.0.0.3:28001",
"meta_user":"xxx",
"meta_passwd":"xxxx",
"dump_db_type":"kunlunbase",
"cluster_name":"cluster_xxx_xxx",
"dump_tables":"postgres_$$_public.t1,postgres_$$_public.t2",
"need_alldump":"0",
"table_structure_file":"xxxx.txt",
"shard_params":[{
"binlog_file":"xxx",
"binlog_pos":"899",
"gtid_set":"xxxx"
}],
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xxx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.5\",\"port\":\"24002\",\"user\":\"xxxx\",\"password\":\"xxx\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
}' -X POST http://172.0.0.1:18002/kunlun_cdc
特别说明:如果不输入表结构文件,CDC默认从当前dump数据库的information_schema中获取表结构。就需要从全量完后到CDC dump到该表这段时间内不能有表结构变化。 表结构文件格式为: CREATE TABLE db_name
.table_name1
( .... ); CREATE TABLE db_name
.table_name2
( .... );
4.1.1.7 支持用户发起从tdengine通过CDC导出数据
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"add_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:6030",
"meta_user":"xxx",
"meta_passwd":"xxxx",
"dump_db_type":"tdengine",
"cluster_name":"cluster_xxx_xxx",
"dump_tables":"xxx.t*",
"shard_params":[{
"binlog_file":"xxx.t1",
"binlog_pos":"ts>=\"2020-08-15 12:00:00.000\"",
"gtid_set":"ts"
},{
"binlog_file":"xxxx.t2",
"binlog_pos":"ts>=\"2020-08-15 12:00:00.000\" and ts < \"2024-01-01 01:00:00.000\"",
"gtid_set":"ts"
}],
"output_plugins":[
{
"plugin_name":"event_file",
"plugin_param":"/xxx/event.log",
"udf_name":"test1"
},
{
"plugin_name":"event_sql",
"plugin_param":"{\"hostaddr\":\"172.0.0.5\",\"port\":\"24002\",\"user\":\"xxxx\",\"password\":\"xxx\",\"log_path\":\"../log\"}",
"udf_name":"test2"
}
]
}
}' -X POST http://172.0.0.1:18002/kunlun_cdc
特别说明:1. 需要dump的表必须有时间戳字段, 2. 如果dump_tables中有多个表,在shard_params需要具体指出表(库名.表名)binlog_file字段, dump的时间段 binlog_pos字段中,dump表时间戳字段名 gtid_set中。 3. 如果dump为一段时间,右端必须时为非闭合
4.1.2 删除dump 数据任务(异步接口)
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"del_dump_table",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:28001,172.0.0.2:28001,172.0.0.3:28001",
"cluster_name":"cluster_xxxx_xxx",
"dump_db_type":"mysql|mariadb",
"dump_tables":"postgres_$$_public.t1,postgres_$$_public.t2"
}
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
特别说明:新增dump_db_type参数,参数值为mysql或者mariadb。如果不传该参数,默认为mysql
4.1.3 获取当前CDC集群主节点(同步接口)
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"get_leader",
"timestamp":"1435749309",
"user_name":"kunlun_test"
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.4 获取当前CDC集群支持的同步目标插件(同步接口)
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"list_support_plugins",
"timestamp":"1435749309",
"user_name":"kunlun_test"
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.5 获取当前CDC集群中所有dump任务(同步接口)
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"list_dump_jobs",
"timestamp":"1435749309",
"user_name":"kunlun_test"
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.6 获取某个dump任务同步状态 (同步接口)
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"get_job_state",
"timestamp":"1435749309",
"user_name":"kunlun_test",
"paras":{
"meta_db":"172.0.0.1:28001,172.0.0.2:28001,172.0.0.3:28001",
"cluster_name":"cluster_xxx_xxx",
"dump_tables":"postgres_$$_public.t1,postgres_$$_public.t2"
}
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.7 获取异步任务状态接口(同步接口)
curl -d '
{
"version":"1.0",
"job_id":"xxx", --需要查询任务job_id
"job_type":"get_state",
"timestamp":"1435749309",
"user_name":"kunlun_test"
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.1.8 获取CDC集群配置信息接口(同步接口)
curl -d '
{
"version":"1.0",
"job_id":"",
"job_type":"list_cdc_conf",
"timestamp":"1435749309",
"user_name":"kunlun_test"
}
' -X POST http://172.0.0.1:18002/kunlun_cdc
4.2 通过xpanel方式配置
4.2.1 上报CDC集群到xpanel
点击CDC服务,新增按钮
上报CDC服务成功。
4.2.2 添加CDC任务
点击CDC任务,新增按钮
设置dump任务数据源,从开源mysql导出。
设置同步点数据信息
点击确认保存
设置数据同步到目标源,可以配置多个目标源
1) 配置数据json文件
点击确认保存
2)配置同步到kunlunbase
点击确认保存
点击确认下发任务
4.2.3 删除CDC任务
从xpanel CDC任务页面找到对应业务
点击删除,输入验证码确认则发起删除任务
4.2.4 查看CDC同步状态
从xpanel CDC任务页面找到对应业务
点击详情可以查看具体同步情况
05 使用Klustron cdc 前置条件
dump的源mysql db 需要以下配置
- gtid_mode=ON,否则无法保证数据不丢失
- binlog_row_metadata=FULL,否则CDC无法正常工作
- binlog_row_image=FULL,建议设置为FULL。
06 自带插件说明:
6.1 event_file插件直接将CDC捕获的数据转成json写入文件。开发自定义插件时可以参考event_file插件输出。
event_file 插件输入参数为json内容具体写入的文件。添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_file", -- 插件名称 "plugin_param":"{"log_path":"xxx","log_name":"xxxx","log_size":"500"}", -- 表示json内容写入文件位置,文件名和每个文件大小 "udf_name":"test1" -- 扩展字段
6.2 event_sql支持将CDC捕获的数据转换成sql,写入目标db。目标db可以为Klustron集群或者开源mysql。默认写入Klustron集群,如果需要开源mysql需要在plugin_param中添加is_kunlun=0。
event_sql插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_sql", --- 插件名称 "plugin_param":"{"hostaddr":"172.0.0.2","port":"24002","user":"abc","password":"abc","log_path":"../log"}",---event_sql插件输入参数 "udf_name":"test2" --- 扩容字段
特别说明:如果使用event_sql将源数据写入Klustron节点,plugin_param里面配置为Klustron计算节点mysql 端口,不能为pg端口。
6.3 parallel_sql 支持将CDC捕获的数据转出sql,并行写入目标db。目前设计为表级并行,即在同一个事物中,多个不同表可以并行写入目标db。并行度根据用户配置来决定。同样支持库表映射。并行度参数为thread_num,如果不配置默认为5
event_sql插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"parallel_sql", --- 插件名称 "plugin_param":"{"hostaddr":"172.0.0.2","port":"24002","user":"abc","password":"abc","log_path":"../log","remap_rules":"test1.t1=>test2.b1,test1.t2=>test2.b2"}",---parallel_sql插件输入参数 "udf_name":"test2" --- 扩容字段
特别说明:插件参数plugin_param为json格式。使用api设置时需要注意json格式是否合法。
6.4 event_kafka 支持将CDC捕获的数据转成json写入kafka中。
event_kafka 插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_kafka", --- 插件名称
"plugin_param":"{"brokers":"172.0.0.2:9002","log_path":"../log"}",---event_kafka插件输入参数,用户可以设置kafka topic,设置topic方式如下:
如果cdc当前为global模式,则根据添加dump任务中cluster_name来指定topic名,
例如:添加dump任务时指定cluster_name为test_abc,则手动配置topic时,test_abc对应的topic名为abc
如果cdc当前为shard模式,如果从kunlunbase集群导出数据时,则根据添加dump任务中cluster_name加上shard名来指定topic名
例如:添加dump任务时指定cluster_name为test_abc,并且当前cluster下有3个shard,则手动配置topic时,test_abc-shard_1对应
topic名为abc1,test_abc-shard_2对应topic名为abc2,test_abc-shard_3对应topic名为abc3,也多个shard指定相同topic名 "udf_name":"test2" --- 扩容字段
6.5 event_es 支持将CDC捕获的数据写入ES中 event_es 插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_es", --- 插件名称 "plugin_param":"{"es_url":"172.0.0.2:24002","es_index":"xxx","es_version":"v7|v8","log_name":"xxxx","log_path":"../log"}",---event_es插件输入参数 "udf_name":"test2" --- 扩容字段
特别说明:目前event_es插件仅支持es版本为v7或者v8
6.6 event_mongodb 支持将CDC捕获的数据写入mongodb中 event_mongodb 插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_mongodb", --- 插件名称 "plugin_param":"{"mongo_url":"172.0.0.2:24002","log_name":"xxxx","log_path":"../log"}",---event_mongodb插件输入参数 "udf_name":"test2" --- 扩容字段
6.7 event_tdengine 支持将CDC捕获的数据写入tdengine中 event_tdengine 插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_tdengine", --- 插件名称 "plugin_param":"{"hostaddr":"172.0.0.2","port":"24002","user":"xxx","password":"v7|v8","log_name":"xxxx","log_path":"../log", "database":"xxxx"}",---event_tdengine插件输入参数 "udf_name":"test2" --- 扩容字段
6.8 event_rabbitmq 支持将CDC捕获的数据写入rabbitmq中 event_rabbitmq 插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"event_rabbitmq", --- 插件名称 "plugin_param":"{"host":"172.0.0.2:24002","vhost":"xxx","exchange":"xxxx","login_user":"xxx","login_password":"v7|v8","log_name":"xxxx","log_path":"../log"},---event_rabbitmq插件输入参数 "udf_name":"test2" --- 扩容字段
特别说明: 支持指定routing_keys,格式为 "assign_routing_keys":[{"channel_name":"xxx", "routing_key":"xxxx"}],其中channel_name为dump任务中cluster_name值,routing_key为用户指定。 如果不支持routing_keys,在CDC默认值为cdc-rabbitmq-"$cluster_name"
6.9 redis_sql 支持将redis数据导入关系型数据库中(mysql/postgres等) redis_sql 插件输入参数,添加dump任务时output_plugins字段中输入参数,例如
"plugin_name":"redis_sql", --- 插件名称 "plugin_param":"{"host":"172.0.0.2","port":"12345","user":"xxxx","password":"xxx","database":"xxxx","table_name":"xxxx","log_name":"xxxx","log_path":"../log"},---event_rabbitmq插件输入参数 "udf_name":"test2" --- 扩容字段
特别说明:目标数据库中必须建立两张表,表名可以变化,表结构不能修改。如果表名变化后,需要输入参数中table_name中指定
CREATE TABLE `redis_repl_tb` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`db_id` int unsigned default '0',
`op_record` text,
PRIMARY KEY (`id`)
);
CREATE TABLE `kunlun_cdc_dump_state` (
`job_id` varchar(64) not NULL,
`repl_info` varchar(1024) default '',
PRIMARY KEY(`job_id`)
);
特别说明:通过调用api list_cdc_conf接口获取当前cdc模式。格式如下:
{"attachment":{"dump_binlog":{"allow_dump_shard_master":"0","dump_shard_max_delay":"1800","query_shard_state_interval":"10","report_cdc_sync_state_interval":"5","pending_binlog_event_num":"1000","dump_mode":"global","binlog_event_queue_len":"4096","reserve_binlog_log_dir":"../data/reserve_dir"},"cdc_ha":{"group_member":"127.0.0.2:18001,127.0.0.1:18001,127.0.0.3:18001","data_dir":"../data/paxosdata","log_dir":"../data/paxoslog"}},"version":"1.0","error_code":"0","error_info":"Ok","status":"Done"}
其中 dump_mode字段内容
七.Klustron CDC部署说明
- 获取kunlun_cdc安装包,解压到目标目录下。
- 到conf目录下修改kunlun_cdc.cnf文件
# Copyright (c) 2022 ZettaDB inc. All rights reserved.
# This source code is licensed under Apache 2.0 License,
# combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
[Base_Config]
############################################
# base config
local_ip = 172.0.0.1
http_port = 18002
log_file_path = ../log/kunlun_cdc
log_file_size = 500
基础配置
Local_ip为本机ip地址
http_port为klustron CDC监听到端口
log_file_path和log_file_size 为kunlun_cdc日志配 置
[Binlog_Config]
############################################
# connect cluster shards strategy
allow_dump_shard_master = 0
dump_shard_node_max_delay = 1800
loop_query_shard_state = 10
loop_report_cdc_sync_state = 5
binlog_msg_queue_len = 1024
pending_binlog_event_num = 1000
reserve_binlog_event_dir = ../data/reserve_dir
kunlun_cdc dump binlog相关配置
allow_dump_shard_master是否允许从shard主上dump,默认为0,该配置在从kunlunbase集群dump数据时有效
dump_shard_node_max_delay 为dump的shard 备节点允许的最大延迟,如果该节点延迟大约配置值,则kunlun_cdc自动选择shard其他备节点。该配置在从kunlunbase集群dump数据时有效
loop_query_shard_state 查询dump shard 状态时间间隔
loop_report_cdc_sync_state dump表状态固化时间间隔
pending_binlog_event_num 表示在xa事务中,CDC默认可以缓存多少条binlog event,当大于配置值时,CDC将缓存binlog event消息写入磁盘
reserve_binlog_event_dir 表示CDC可以将缓存binlog event消息写入磁盘位置
[HA_Config]
############################################
# config paxos
ha_group_member = 172.0.0.1:18001,172.0.0.2:18001,172.0.0.3:18001
server_id = 2
paxosdata_dir = ../data/paxosdata
paxoslog_dir = ../data/paxoslog
paxosdata_compress = 0
paxosdata_write_buffer_size = 2
paxosdata_max_write_buffer_number = 2
paxosdata_min_writer_buffer_number_to_merge = 1
paxosdata_max_backgroup_compactions = 6
paxosdata_max_bytes_for_level_base = 64
paxosdata_target_File_size_base = 64
paxosdata_level0_slowdown_writes_trigger = 12
paxosdata_level0_stop_writes_trigger = 16
paxosdata_block_cache_size = 5
paxosdata_block_size = 64
paxosdata_bloom_filter_bits_per_key = 10
paxosdata_block_based_bloom_filter = 0
klustron CDC高可用配置
ha_group_member klustron CDC集群节点ip:port,个数要求为奇数
server_id为该节点ip在ha_group_member中位置,例如该机器为172.0.0.2,则server_id=2
[Plugin_Config]
############################################
plugin_so = event_file,event_sql
Klustron CDC插件配置。