在日常的业务开发场景中,像 一个人有多套房子,多个住址 ,一篇文章中有多个评论这种需求还是非常常见的。当我们使用 Elasticsearch 来进行存储时, ES 的字段类型是 nested 类型 ,虽然这个效率不高。
当我们使用 canal 对数据进行增量同步到 ES 时,canal-adapter 是否是支持 nested 类型呢?
查看 issue 提问
好遗憾。大佬说暂时不支持。
在本想放弃的时候,看到 issue 中有位小伙伴说,配置 object 兼容 nested ,但是并没有给出解决方案。废话不多说,实践一把,走起。
1. 数据处理
1.1 创建 存在 字段类型为 nested 的索引 canal_test
首先创建 elasticsearch的索引名为 canal_test ,其中 addresses 字段是 nested 类型
curl -XPUT "http://localhost:9200/canal_test" -H 'Content-Type: application/json' -d'{ "settings": { "number_of_shards": 1 }, "mappings": { "dynamic": false, "properties": { "addresses": { "type": "nested", "properties": { "address": { "analyzer": "ik_max_word", "type": "text" }, "houseId": { "type": "long" }, "zxbs": { "type": "long" }, "jwhdm": { "type": "keyword" }, "id": { "type": "long" } } }, "death": { "type": "boolean" }, "gender": { "type": "keyword" }, "nation": { "type": "keyword" }, "zxbs": { "type": "keyword" }, "name": { "type": "keyword", "fields": { "fulltext": { "type": "text" } } }, "residentId": { "type": "long" }, "type": { "type": "long" } } }}'1.2 表结构
创建 两张表 t_address(地址表),t_rk (人口表)。一个人可以有多个地址,一对多的关系
CREATE TABLE `t_address` (`address` varchar(255) DEFAULT NULL,`houseId` int(11) DEFAULT NULL,`zxbs` int(11) DEFAULT NULL,`jwhdm` varchar(255) DEFAULT NULL,`id` int(11) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;CREATE TABLE `t_rk` (`residentId` int(11) NOT NULL AUTO_INCREMENT,`type` int(255) DEFAULT NULL,`zxbs` varchar(255) DEFAULT NULL,`name` varchar(255) DEFAULT NULL,`nation` varchar(255) DEFAULT NULL,`gender` varchar(255) DEFAULT NULL,`death` char(1) DEFAULT NULL,PRIMARY KEY (`residentId`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;
1.3测试数据
INSERT INTO `demo`.`t_rk`(`residentId`, `type`, `zxbs`, `name`, `nation`, `gender`, `death`) VALUES (3, 1, '1', 'huang123', '113', '1', '1');INSERT INTO `demo`.`t_address`(`address`, `houseId`, `zxbs`, `jwhdm`, `id`) VALUES ('厦门', 12, 23, '23', 3);INSERT INTO `demo`.`t_address`(`address`, `houseId`, `zxbs`, `jwhdm`, `id`) VALUES ('漳州', 233, 23, '23', 3);INSERT INTO `demo`.`t_address`(`address`, `houseId`, `zxbs`, `jwhdm`, `id`) VALUES ('泉州', 233, 23, '23', 3);
2. canal-adapter 配置
默认已经熟悉 canal 和 canal-adapter 的使用。在同步到 es 中,我们知道需要为每个索引配置一份 yml 的配置文件,下面创建canal_test.yml 文件 ,同步配置如下
2.1 nested 配置的正确姿势
dataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:_index: canal_test_type: _doc_id: _idupsert: true# pk: idsql: "SELECTt1.residentId AS _id,t1.`name`,t1.death,t1.gender,t1.nation,t1.type,t1.zxbs,CONCAT('[',c.address,']') AS addressesFROMt_rk t1LEFT JOIN (SELECTid,GROUP_CONCAT(JSON_OBJECT('address',address,'houseId',houseId)) AS addressFROMt_addressGROUP BYid) c ON c.id = t1.residentId"objFields:addresses: object# etlCondition: "where a.c_time>={}"commitBatch: 3000
* 重点关注
SELECTt1.residentId AS _id,t1.`name`,t1.death,t1.gender,t1.nation,t1.type,t1.zxbs,CONCAT('[',c.address,']') AS addressesFROMt_rk t1LEFT JOIN (SELECTid,GROUP_CONCAT(JSON_OBJECT('address',address,'houseId',houseId)) AS addressFROMt_addressGROUP BYid) c ON c.id = t1.residentId"
配置中的关键
objFields:addresses: object
2. 测试同步情况
* 获取第 1 步中准备的sql,执行测试数据 sql
canal 执行日志
3. 同步结果
4. 搜索验证
GET canal_test/_search{"query":{"bool": {"must": [{"nested": {"path": "addresses","query": {"bool": {"must": [{"match_phrase": {"addresses.address": "漳州"}}]}}}}]}}}
查询结果:
好了 ,今天的实践就到这里。你学废了吗?
这边格式有点奇怪,有需要可以到公众号看
https://mp.weixin.qq.com/s/jIVxxHp9GsE2WL2DNPsVJQ