背景
在工作中,各产品线经常会有实时数据分析、跨中心查询的需求,而我们使用的Postgresql数据库难以实现跨库查询,所以,我们将Postgresql的数据实时同步到Snappydata中,来实现跨中心查询、实时数据分析的需求。
这里要说明的一点,运维Snappydata是非常痛苦的一件事,它的社区很不活跃,遇到问题,基本上都是要自己解决,并且它的语法并没有常用数据库那么丰富。
目前,我们正在考虑,将数据实时同步到Postgresql指定的库中,以schema加以区分数据库(数据来源)或产品线。
管道介绍
我们的实时同步主要是将Postgresql的数据同步到Snappydata中,而你也可以使用Streamsets实现从Mysql到Mysql的实时同步等等,Streamsets的组件是非常丰富,可以说足以满足常用的需求。这里,我先介绍第一版实时同步管道,但它会存在一些问题,这一版只能算是完成了实时数据同步的功能。
前提条件:已经安装了wal组件来采集Postgresql数据
实时数据同步管道(第一版)
管道组件介绍:
1.Postgresql CDC Client
用途:连接数据库,采集Wal日志
参数配置:使用组件默认参数,填写必要的Replication Slot、JDBC连接、用户名/密码
需要指出的是,如果你想立刻看到数据变化,请将Max Batch Size设置为1(默认为100)
这里,我们发现Streamsets-3.9.1有以下Bug、问题,
1、高并发下,会存在数据丢失的问题,该问题我已经向Streamsets公司指出,目前他们正在完善中,估计会在3.16版本中修复;jira链接https://issues.streamsets.com/browse/SDC-13269#add-comment
2、我们测试3.9.1版本中该组件过滤表的功能,并没有起到过滤作用,因此,我们通过写python代码的方式实现过滤表的功能
3、该组件并不能采集DDL语句,只能采集insert、update、delete操作
4、设置项中Poll Interval 要小于Postgresql配置文件(postgres.conf) 的** wal_sender_timeout**
2. Expression Evaluator
用途:添加过滤表单
说明:可以不使用该组件,可以在③中写python脚本来过滤
3.Jython
用途:写python来解析wal日志,将wal日志处理成json形式
说明:
原始wal日志update/insert:
{
"xid": 1055891831,
"nextlsn": "130/6FC8A230",
"timestamp": "2020-01-08 15:00:[14.243564+08]",
"change": [{
"kind": "update",
"schema": "********",
"table": "********",
"columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "animal_id", "property_id", "property_val"],
"columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "bigint", "bigint", "character varying(500)"],
"columnvalues": [664478959718834178, "1182483438204788738", "1182483438204788738", "2020-01-08 14:42:[09.419]", "2020-01-08 15:00:[14.228968]", 502, 627092828622684160, false, 664478959546867712, 1158021187370930177, "2020-01-08 15:00:14"],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["bigint"],
"keyvalues": [664478959718834178]
}
}, {
"kind": "insert",
"schema": "********",
"table": "********",
"columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "event_code", "event_date", "animal_id", "event_content", "org_id", "total_parity"],
"columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "character varying(32)", "timestamp(6) without time zone", "bigint", "jsonb", "bigint", "integer"],
"columnvalues": [664483509762727936, "1182483438204788738", "1182483438204788738", "2020-01-08 15:00:[14.234]", "2020-01-08 15:00:[14.234]", 503, 627092828622684160, false, "CR", "2020-01-08 15:00:14", 664478959546867712, "{\"reason\": \"4\", \"remark\": \"App登记\", \"isCancel\": 0}", 642422751306448896, null]
}]
}
原始wal日志delete:
{
"xid": 1055894606,
"nextlsn": "130/700E9880",
"timestamp": "2020-01-08 15:18:53.051471+08",
"change": [{
"kind": "delete",
"schema": "*********",
"table": "*********",
"oldkeys": {
"keynames": ["id"],
"keytypes": ["bigint"],
"keyvalues": [631496870811664384]
}
}]
}
jython中python脚本代码:
for record in records:
try:
# 过滤表class和表student
tables = ['class', 'student']
if record.value['change']['table'] in tables:
changes=record.value['change']
for change in changes:
record.value.clear()
kind=change['kind']
if kind != 'delete':
columnnames_list=change['columnnames']
columnvalues_list=change['columnvalues']
else:
columnnames_list=change['oldkeys']['keynames']
columnvalues_list=change['oldkeys']['keyvalues']
for idx in range(len(columnnames_list)):
record.value[columnnames_list[idx]]=columnvalues_list[idx]
record.value['kind']=kind
record.value['table']=change['table']
record.value['schema']=change['schema']
record.value['database']='*********'
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
处理后的数据:(这里,并没有截取原始wal日志的那条数据,只是来说明处理后的格式)
新增、更新记录
{"id":645986949249966087,"create_user":"1191899847279063041","modify_user":"1191899847279063041","create_time":"2019-11-18 14:01:[30.558","modify_time":"2020-01-08 13:54:[55.789116]","app_id":502,"tenant_id":626804036418404352,"org_id":641585797362876416,"deleted":false,"stage":"nursery","start_date":"2019-10-01 14:00:37","end_date":"2020-01-08 13:54:14","qty":1,"wgt":[33.33,],"start_wgt_date":null,"end_wgt_date":null,"start_wgt":[0.0,],"end_wgt":[0.0,],"pre_id":0,"kind":"update","table":"********","schema":"********"}
删除
{"id":618383552928546816,"kind":"delete","table":"********","schema":"********"}
下面的截图是管道的数据快照,左边为原始的wal日志数据,右边为Jython组件处理后输出的数据。
4.Stream Selector
用途:分流
说明:由于Streamsets的JDBC Producer只能进行insert/update/delete其中的一种操作,所以,你需要三个JDBC Producer来操作数据库
5.JDBC Producer
用途:操作数据库
说明:只能进行一种操作
至此,你可以简单实现实时数据同步任务,如果,你不想自己配置,这里,我提供了管道的json文件,你直接导入到Streamsets,修改参数就可以使用了
百度云链接(还未上传)
这一版本存在以下问题:
每个管道占用一个postgresql slot,如果有其他管道使用该数据库的数据(如:实时宽表),则需要另外启动一个slot来采集数据,这样会降低postgresql的性能
所以,我将wal日志采集到kafka中,可以让多个管道进行消费
未完,后续更新