mysql_2_elasticsearch v1.0.7 版本更新


A customizable importer from mysql to elasticsearch.
可定制的 elasticsearch 数据导入工具

版本更新说明:

releases logs
v1.0.7 对不规范的时间日期字符串的宽容性优化;代码稳健性优化;导库任务结束后自动断开 Mysql 连接
v1.0.6 新增功能:支持使用 SQL 语句,将查询结果集导入 ES
v1.0.5 新增功能:配置项 exception_handler[field_name].writeAs 支持传递回调函数

主要功能

  1. 完全使用 JS 实现数据从 MySQL 到 elasticsearch 的迁移;
  2. 可一次性导入多张 MySQL 数据表和数据集合;
  3. 可自定义的数据迁移的规则(数据表/字段关系、字段过滤、使用正则进行异常处理);
  4. 可自定义的异步分片导入方式,数据导入效率更高。

一键安装

npm install mysql_2_elasticsearch

快速开始(简单用例)

var esMysqlRiver = require('mysql_2_elasticsearch');

var river_config = {
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },
  elasticsearch: {
    host_config: {               // es客户端的配置参数
      host: 'localhost:9200',
      // log: 'trace'
    },
    index: 'myIndex'
  },
  riverMap: {
    'users => users': {}         // 将数据表 users 导入到 es 类型: /myIndex/users
  }
};


/*
** 以下代码内容:
** 通过 esMysqlRiver 方法进行数据传输,方法的回调参数(一个JSON对象) obj 包含此次数据传输的结果
** 其中:
** 1. obj.total    => 需要传输的数据表数量
** 2. obj.success  => 传输成功的数据表数量
** 3. obj.failed   => 传输失败的数据表数量
** 4. obj.result   => 本次数据传输的结论
*/

esMysqlRiver(river_config, function(obj) {
  /* 将传输结果打印到终端 */
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('成功:' + obj.success + '项');
  console.log('失败:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:全部数据传送完成!');
  } else {
    console.log('\n结论:传送未成功...');
  }
  console.log('---------------------------------');
  /* 将传输结果打印到终端 */
});

最佳实现(完整用例)

var esMysqlRiver = require('mysql_2_elasticsearch');

/*
** mysql_2_elasticsearch 的相关参数配置(详情见注释)
*/

var river_config = {

  /* [必需] MySQL数据库的相关参数(根据实际情况进行修改) */
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },

  /* [必需] es 相关参数(根据实际情况进行修改) */
  elasticsearch: {
    host_config: {               // [必需] host_config 即 es客户端的配置参数,详细配置参考 es官方文档
      host: 'localhost:9200',
      log: 'trace',
      // Other options...
    },
    index: 'myIndex',            // [必需] es 索引名
    chunkSize: 8000,             // [非必需] 单分片最大数据量,默认为 5000 (条数据)
    timeout: '2m'                // [非必需] 单次分片请求的超时时间,默认为 1m
    //(注意:此 timeout 并非es客户端请求的timeout,后者请在 host_config 中设置)
  },

  /* [必需] 数据传送的规则 */
  riverMap: {
    'users => users': {            // [必需] 'a => b' 表示将 mysql数据库中名为 'a' 的 table 的所有数据 输送到 es中名为 'b' 的 type 中去
      filter_out: [                // [非必需] 需要过滤的字段名,即 filter_out 中的设置的所有字段将不会被导入 elasticsearch 的数据中
        'password',
        'age'
      ],
      exception_handler: {               // [非必需] 异常处理器,使用JS正则表达式处理异常数据,避免 es 入库时由于类型不合法造成数据缺失
        'birthday': [                    // [示例] 对 users 表的 birthday 字段的异常数据进行处理
          {
            match: /NaN/gi,              // [示例] 正则条件(此例匹配字段值为 "NaN" 的情况)
            writeAs: null                // [示例] 将 "NaN" 重写为 null
          },
          {
            match: /(\d{4})年/gi,        // [示例] 正则表达式(此例匹配字段值为形如 "2016年" 的情况)
            writeAs: '$1.1'              // [示例] 将 "2016年" 样式的数据重写为 "2016.1" 样式的数据
          },
          {
            match: /(\d{4})年/gi,        // [示例] 正则表达式(此例匹配字段值为形如 "2016年" 的情况)
            writeAs: function (word){    // [示例] 用回调函数处理数据
              return parseInt(word) + '.1';
            }
          }
        ]
      }
    },
    'favors => favors': {
      // [非必需] MySQL 查询语句,将查询结果导入名为 favors 的es类型中,若不配置此项,默认将数据表 favors 的数据导入 es 中
      SQL: 'SELECT favors.id,users.name,favors.favor FROM favors,users WHERE favors.user_id = users.id',
      filter_out: [ ... ],
      exception_handler: {
        ...
      }
    },
    // Other fields' options...
  }

};


/*
** 将传输结果打印到终端
*/

esMysqlRiver(river_config, function(obj) {
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('成功:' + obj.success + '项');
  console.log('失败:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:全部数据传送完成!');
  } else {
    console.log('\n结论:传送未成功...');
  }
  console.log('---------------------------------');
});

注意事项及参考

  1. elasticsearch 数据导入前请事先导入或配置好 index/type 的数据结构;
  2. host_config 参数设置详见 es官方文档
  3. mysql 表的自增 id 自动替换为 表名+_id 的格式,如:users_id
  4. 如因数据格式或内容问题导致的元数据导入缺失或失败,可通过设置 exception_handler 参数进行包容。

github 项目地址

https://github.com/parksben/mysql_2_elasticsearch

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容