利用StreamSets实现MySQL中变化数据实时写入Kudu

  1. 环境准备
    1. 开启MariaDB的Binlog日志

      修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置

      server-id=999
      log-bin=mysql-bin
      binlog_format=ROW
      

      注意:
      MySQL Binlog支持多种数据更新格式包括Row、Statement和mix(Row和Statement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。

      binlog 配置参考:
      https://dev.mysql.com/doc/refman/5.7/en/binary-log-setting.html

      [root@node01 mariadb]# systemctl restart mysqld
      [root@node01 mariadb]#  systemctl status mysqld
      ● mysqld.service - LSB: start and stop MariaDB
         Loaded: loaded (/etc/rc.d/init.d/mysqld; bad; vendor preset: disabled)
         Active: active (running) since Tue 2020-04-28 09:59:03 CST; 1min 11s ago
           Docs: man:systemd-sysv-generator(8)
        Process: 12771 ExecStop=/etc/rc.d/init.d/mysqld stop (code=exited, status=0/SUCCESS)
        Process: 12925 ExecStart=/etc/rc.d/init.d/mysqld start (code=exited, status=0/SUCCESS)
         CGroup: /system.slice/mysqld.service
                 ├─12970 /bin/sh /usr/local/mariadb/bin/mysqld_safe --datadir=/usr/local/mariadb/data --pid-file=/usr/local/mariadb/data/node01.pid
                 └─13079 /usr/local/mariadb/bin/mysqld --basedir=/usr/local/mariadb --datadir=/usr/local/mariadb/data --plugin-dir=/usr/local/mariadb/lib/plugin --user=mysql --log-error=/usr/l...
      
      Apr 28 09:59:02 node01 systemd[1]: Starting LSB: start and stop MariaDB...
      Apr 28 09:59:02 node01 mysqld[12925]: Starting MariaDB.200428 09:59:02 mysqld_safe Logging to '/usr/local/mariadb/data/node01.err'.
      Apr 28 09:59:02 node01 mysqld[12925]: 200428 09:59:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mariadb/data
      Apr 28 09:59:03 node01 mysqld[12925]: [  OK  ]
      Apr 28 09:59:03 node01 systemd[1]: Started LSB: start and stop MariaDB.
      
    2. 创建 MariaDB 同步账号
      GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
      GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
      FLUSH PRIVILEGES;
      
    3. StreamSets 安装 MySQL 驱动

      参考:
      https://blog.csdn.net/weixin_43215250/article/details/87981707

    4. 创建测试表
      1. 在MariaDB数据库中创建测试表
      CREATE DATABASE IF NOT EXISTS test;
      CREATE TABLE test.`binlog_test` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `name` varchar(255) NOT NULL,
        PRIMARY KEY (`id`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      
      1. 在 HUE 上创建 KUDU 表
      CREATE DATABASE IF NOT EXISTS test;
      CREATE TABLE IF NOT EXISTS test.binlog_test ( 
        id int, 
        name String,
        PRIMARY key(id)
      ) 
      PARTITION BY HASH PARTITIONS 16 
      STORED AS KUDU;
      
  2. 创建 StreamSets 的 Pipline
  1. 创建一个新的 Pipline
    image
  2. 选择 Origins 类别,搜索 MySQL Binary Log,并拖动到画布
    image

    配置 MySQL Binary Log 基本信息

    image

  **配置 MySQL 连接信息**
  
  **注意:** 此处配置的 Server ID 应与 MySQL 的 my.cnf 文件中的 server-id 保持一致。
  ![image](https://upload-images.jianshu.io/upload_images/18545623-f43db30aa5089c27?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

  **配置 MySQL 账号信息**
  ![image](https://upload-images.jianshu.io/upload_images/18545623-33d3abeb44e632e3?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

  **高级配置,根据自己的需要进行配置,这里采用默认**
  ![image](https://upload-images.jianshu.io/upload_images/18545623-59232a75d0526241?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  
  **参考:** 
  https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MySQLBinaryLog.html?hl=mysql%2Cbinary%2Clog
  1. 添加表过滤的Stream Selector 1
    image

    配置 Stream Selector 基本信息

    image

    配置分流条件
    ${record:value("/Table") == "binlog_test"}
    image

  2. 添加表过滤的 Stream Selector 1

    配置 Stream Selector 基本信息

    image

    配置分流条件
    ${record:value("/Table") == "DELETE"}

    image

    参考:
    https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/StreamSelector.html?hl=stream%2Cselector

  1. 添加处理日志 JavaScript Evaluator

    添加解析 DELETE 类型的Binary Log 日志的 JavaScript Evaluator

    image

    配置JavaScript脚本

    for(var i = 0; i < records.length; i++) {
      try { 
        var newRecord = sdcFunctions.createRecord(true);
        newRecord.value = records[i].value['Data'];
        newRecord.value.Type = records[i].value['Type'];
        newRecord.value.Database = records[i].value['Database'];
        newRecord.value.Table = records[i].value['Table'];
        log.info(records[i].value['Type'])
        output.write(newRecord);
      } catch (e) {
        // Send record to error
        error.write(records[i], e);
      }
    }
    
    image
  **添加解析 INSRET 和 UPDATE 类型日志的 JavaScript Evaluator**
  ![image](https://upload-images.jianshu.io/upload_images/18545623-30378a88b8f7f613?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  
  **配置JavaScript脚本**
  ```
  for(var i = 0; i < records.length; i++) {
    try { 
      var newRecord = sdcFunctions.createRecord(true);
      newRecord.value = records[i].value['OldData'];
      newRecord.value.Type = records[i].value['Type'];
      newRecord.value.Database = records[i].value['Database'];
      newRecord.value.Table = records[i].value['Table'];
      log.info(records[i].value['Type'])
      output.write(newRecord);
    } catch (e) {
      // Send record to error
      error.write(records[i], e);
    }
  }
  ```
  ![image](https://upload-images.jianshu.io/upload_images/18545623-8492b6202b22945e?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
  
  **参考:** 
  https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JavaScript.html?hl=javascript%2Cevaluator
  1. 添加 KUDU

    配置 Kudu Delete
    配置Kudu基本属性

    image

    配置Kudu环境信息
    image

    Kudu的高级配置,这里使用默认配置
    image

    配置 Kudu Upsert
    配置Kudu基本属性

    image

    配置Kudu环境信息


    image

    Kudu的高级配置,这里使用默认配置


    image
  2. 校验 Pipelines 配置
    image
  3. 启动 Pipelines
    image
  4. Pipeline 流程测试
    1. 新增数据

      向 MariaDB 中的 binlog_test 表中插入数据

      insert into test.binlog_test values(1, '张三');
      

      在 StreamSets 中查看的 Pipeline 状态

      image

      image

      使用Hue查看 Kudu 表数据,验证数据是否成功插入

      image

    2. 更新数据

      更新 MariaDB 中的 binlog_test 表中数据

      update test.binlog_test set name='李四' where id = 1;
      

      在 StreamSets 中查看的 Pipeline 状态

      image

      image

      使用Hue查看 Kudu 表数据,验证数据是否成功更新
      image

    3. 删除数据

      删除 MariaDB 中的 binlog_test 表中数据

      delete from test.binlog_test where id = 1;
      

      在 StreamSets 中查看的 Pipeline 状态

      image

      image

      使用Hue查看 Kudu 表数据,验证数据是否成功删除
      image

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容