原文链接:https://www.gbase.cn/community/post/5412
更多精彩内容尽在南大通用GBase技术社区,南大通用致力于成为用户最信赖的数据库产品供应商。
数据同步功能的使用
参数配置
使用kafka consumer需要按照如下方式进行配置,可变更参数的配置参考补充说明。
··配置gcluster参数[l1]
/opt/gcluster/config/gbase_8a_gcluster.cnf
_gbase_transaction_disable=1(注意一定不要用0)
gcluster_lock_level=10(不建议用2)
_gcluster_insert_cache_buffer_flag=1
gcluster_assign_kafka_topic_period=20
gcluster_kafka_max_message_size=1000000
gcluster_kafka_batch_commit_dml_count=100000
gcluster_kafka_local_queue_size=210000
gcluster_kafka_consume_batch=100
gcluster_kafka_user_allowed_max_latency=15
··可变更参数说明:
·gcluster_assign_kafka_topic_period,自动接管consumer的时间周期,单位为秒,例如A节点宕机了,最大需要等待gcluster_assign_kafka_topic_period秒之后,A节点负责的同步任务会被其他节点接管。最小值20s,最大值120s。
·gcluster_kafka_max_message_size,从kafka topic获得消息的最大长度,单位为字节,最大值1000000000字节,这个值需要大于等于kafka server的配置(message.max.bytes),否则可能造成消费问题,即,如果kafka队列中存在一条消息,其大小超过gcluster_kafka_max_message_size就会造成消费卡住。
·gcluster_kafka_batch_commit_dml_count,一次提交dml操作的数量,适当调大能明显提高性能,但是如果一个topic涉及的表很多(几百个表)则建议该参数调小,表越多越应该调小,调小的目的是使得一次提交命中的表少一些,具体需要结合具体用户场景、同步速度、资源占用情况具体对待。 未来启用新事务后,表数量多对性能的影响会降低,会再次更新手册。需要注意的是,此参数是一个意向值,程序未必会严格按照此参数来提交,比如如果一个事务包含大量DML操作,那么程序必须确保事务完整性;再比如从kafka取消息、解析消息的速度慢于往单机提交数据的速度,那么程序也会选择先提交,而不是一定要等待满足gcluster_kafka_batch_commit_dml_count参数。
·gcluster_kafka_user_allowed_max_latency,允许消息在8a集群层缓存多长时间,超时之后必须马上提交,单位是毫秒。此参数与gcluster_kafka_batch_commit_dml_count作用类似,都是决定什么时候提交的。多攒一些数据再提交,有利于降低磁盘占用,如果用户对数据延迟不太敏感,而对磁盘占用比较敏感,可以通过这个参数来调节。典型值一般可以设置为50000~20000,需要注意提交动作本身也需要消耗时间。
·gcluster_kafka_local_queue_size,储存dml操作的队列的长度,建议至少为gcluster_kafka_batch_commit_dml_count的二倍多一些。
·gcluster_kafka_consume_batch,consumer一次读取kafka消息的条数。如果kafka队列里的消息size较小,可以设大,反之设小,此参数对性能的影响不大,所以一般没必要设太大,建议设为10~1000。
··配置gnode参数
/opt/gnode/config/gbase_8a_gbase.cnf
_gbase_transaction_disable=1(注意一定不要用0)
gbase_tx_log_mode=ONLY_SPECIFY_USE(注意一定不要用USE,STANDARD_TRANS)
gbase_buffer_insert=1024M
gbase_tx_log_flush_time=5
··可变更参数说明:
gbase_buffer_insert,insert buffer的大小,随gcluster_kafka_batch_commit_dml_count的设置进行调整,如果数据量大,且consumer任务多,建议调大。需要保证单机insert buffer足够,否则会导致异常。
gbase_tx_log_flush_time,单机内存数据刷新频率,单位为秒。建议设为5秒。
注:如果现场之前使用的是“gbase_tx_log_mode=USE,STANDARD_TRANS”这种配置(之前开发过程中使用事务模式的配置,后面已放弃这种配置),则在修改参数“gbase_tx_log_mode= ONLY_SPECIFY_USE”后,最好把之前同步数据涉及的表重导一遍数据(否则可能会报错)。如果数据量较大也可以配置gnode参数“_gbase_bsi_check_disable=1”绕过这个检查报错逻辑(这种情况产生的报错并不会造成什么问题。关掉检查的缺点是可能会在后面如果有其他bug引起错误后,无法及时报错)。
注意各节点的配置都要一致,只改动部分节点可能产生未知错误。
启停命令
以下命令在任意Coordinator节点,通过gccli执行。
创建consumer task
Create kafka consumer <consumer_name> transaction topic <kafka_topic_name> brokers ’ip:port, ip:port,…’;
Consumer_name:消费任务的名称。唯一,不允许重复,最大长度64 bytes。
kafka_topic_name:需要消费的kafka topic名称,最大长度64bytes。
ip:kafka broker的ip。
port:kafka broker的port。
限制:不允许两个consumer task使用相同的topic+brokers组合。
示例,创建名为test1的consumer task,从topic_1消费数据:
Create kafka consumer test1 transaction topic topic_1 brokers ’10.10.10.10:9092,10.10.10.11:9092’;
删除consumer task
Drop kafka consumer <consumer_name>;
删除前需要先确保consumer task处于停止状态,否则会报错。
查看consumer task属性
show kafka consumer <consumer_name>;
查看单个consumer的属性,属性即为创建consumer task时指定的参数。
show transaction consumer;
查看全部进行数据同步的consumer属性。
启动consumer task
start kafka consumer <consumer_name>;
执行后会启动指定的consumer,从上一次的位置继续同步。
所有已经启动的consumer task,其运行在哪个节点是由任务调度线程来分配的,不一定运行在命令所在的节点,可以查看consumer task运行状态来获得其运行在哪个节点(IP地址)。
只要用户没有下stop命令来停止consumer task,consumer task就会一直运行,即使把集群全部stop,当再次start集群的时候,consumer task又自动继续运行。
停止
stop kafka consumer <consumer_name>;
执行后会停止指定的consumer。
状态查询
select * from information_schema.kafka_consumer_status;
查询所有已启动的consumer task的同步状态。
字段含义为:
Consumer: consumer name
IP: consumer task运行在哪个节点(IP地址)。
Topic: kafka topic name
Status: start、stop两种状态[l3]。
Min_offset:当前kafka队列里的最小offset,如果成功连接到了kafka server,这个字段是实际值,否则是0。
Max_offset: 当前kafka队列里的最大offset,如果成功连接到了kafka server,这个字段是实际值,否则是0。
Cur_offset:8a目前正在取得的消息的offset。
Process_offset: 8a正在执行同步的消息的offset。
Commit_offset: 8a最后一次执行commit操作时,对应的消息的offset,也就是说,当前同步或者消费到的offset。
Exception: 最后一次出现错误时的错误描述信息,异常包括:连不上kafka server、解析json消息出错以及详细的错误原因、目标表不存在、目标表列定义错误(列名、列数量、列顺序)、在集群层处理数据同步的时候发生错误、发给单机执行单机报错、提交错误。
Consumer task运行中的异常处理
如果遇到json消息解析出错(表定义不一致的情况例外),consumer task会转换到睡眠状态,查看运行状态可以在exception栏位看到具体是什么问题,其中包含出错消息对应的offset,用户应该stop这个consumer task,用kafka自带的consumer工具读取这个offset的消息,检查问题,kafka自带工具的用法:
$kafka_home/bin/kafka-simple-consumer-shell.sh --broker-list 192.168.103.74:9092[l4] --topic test3 --offset 797 --max-messages 1 --partition 0
如果遇到集群加锁失败或者自身节点离线,会退出自身节点负责的所有consumer task,等待consumer task调度器分配任务,这时候查看运行状态会显示:
status=waiting start,IP=unknown。
如果遇到gbase单机执行同步报错,consumer task会转换到睡眠状态[l5],用户应该根据异常描述,处理了相关问题后,重启consumer task。
下面简要介绍一下各种异常描述信息所代表的含义,以及用户应该采取什么措施来处理问题。
··json解析问题
异常信息为: consumer HANG UP due to parse kafka message error, you need modify the json and restart consumer.parse json error, kafka offset=8888, error=CJosn lib can't parse the json, error maybe (before): ||B":"2","D":"2"}}]||
表示Offset=8888这个消息有错误,对应的json为:
{"table":"TEST.t5","op_type":"I","pos":"00000000050095733212","primary_keys":["A"],"before":{},"after":{"A":"\","B":"2","D":"2"}},
可以看到"\"存在转义不当问题,正确格式是:"\\"。
提示错误位于||B":"2","D":"2"}}]||这段内容之前。
在UTF8模式下,OGG可以做到正确处理转义,这条错误json是用其他工具生成的。
··缺少primary key
异常信息为:consumer HANG UP due to parse kafka message error, you need modify the json and restart consumer.parse json error, kafka offset=8888, error=primary key can not be empty, table_name=TEST.T5
表示Offset=8888这个消息有错误,对应的json为:
{"table":"TEST.t5","op_type":"D","pos":"00000000050095733212","primary_keys":[],"before":{"A":"1"},"after":{"A":"1","B":"2","D":"2"}}
可以看到这是delete操作,要求primary key必须不为空。
··Primary key无效
异常信息为:consumer HANG UP due to parse kafka message error, you need modify the json and restart consumer.parse json error, kafka offset=8888, error=dml update: can not find primary key's value from before condition, table_name=TEST.T5
表示Offset=8888这个消息有错误,对应的json为:{"table":"TEST.t5","op_type":"D","pos":"00000000050095733212","primary_keys":["B"],"before":{"A":"1"},"after":{"A":"1","B":"2","D":"2"}},
Primary key=B,但是从before里找不到B对应的数据,说明是无效的primary key。
目标表表不存在
异常信息为:table TEST.T5 is not exist in gbase8A, please create table in gbase8a。
用户在8a建表后,去consumer所在的coordinator执行[l6]continue kafka consumer 即可继续同步数据。
··表定义不一致
异常信息为:consumer HANG UP due to parse kafka message error, you need modify the json and restart consumer.parse json error, kafka offset=8888, error=column wrong, table_name=TEST.T5 wrong column=[E]
··Primary key不固定
异常信息为:primary key changed!! table_name=TEST.T5, old primary key:[A], new primary key:[A,B]。
··数据入库错误
异常信息为: Find error done!kafka offset [8888], consumer HANG UP due to can not process this message. You need restart consumer., error=[192.168.6.162:5050](GBA-02AD-0005)Failed to query in gnode:
DETAIL: Data truncated for column 'D' at row 1
SQL: INSERT INTO "TEST"."T5_n1" ("A", "B", "D") VALUES ('1','2','2fffffffffffffffffff')
原因是2fffffffffffffffffff数据越界。
原文链接:https://www.gbase.cn/community/post/5412
更多精彩内容尽在南大通用GBase技术社区,南大通用致力于成为用户最信赖的数据库产品供应商。