2-Elasticsearch集群数据批量导入

声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87

1、数据形式

我们采用Person的作为数据出发点,将文件中的序列化为Json的Person对象导入Elasticsearch集群中。
本文中的代码详见:https://github.com/hawkingfoo/es-batch-import

1.1 数据类型
public class Person {
    private int pid;            // person id
    private int age;
    private boolean sex;
    private String name;
    private String addr;
}
1.2 序列化Json后的文件类型

Person.dat id与json串以\t作为分割。

0   {"pid":0,"age":41,"sex":true,"name":"Lucy","addr":"Shanghai"}
1   {"pid":1,"age":9,"sex":true,"name":"Jenny","addr":"Shenzhen"}
2   {"pid":2,"age":9,"sex":true,"name":"Lily","addr":"Tianjin"}
3   {"pid":3,"age":42,"sex":false,"name":"David","addr":"Guangzhou"}
4   {"pid":4,"age":40,"sex":true,"name":"Mary","addr":"Chongqing"}
5   {"pid":5,"age":3,"sex":true,"name":"Jenny","addr":"Guangzhou"}

2、ES建立index和mapping

建立5个分片1个副本的index,其中ES的type为infos,对应的mapping如下:

{
  "settings": {
    "index": {
      "creation_date": "1470300617555",
      "legacy": {
        "routing": {
          "hash": {
            "type": "org.elasticsearch.cluster.routing.DjbHashFunction"
          },
          "use_type": "false"
        }
      },
      "number_of_shards": "5",
      "number_of_replicas": "1",
      "uuid": "mJXGBmnYS12mXBo0aGrR3Q",
      "version": {
        "created": "1070099",
        "upgraded": "2030499"
      }
    }
  },
  "mappings": {
    "infos": {
      "_timestamp": {},
      "properties": {
        "sex": {
          "type": "boolean"
        },
        "name": {
          "index": "not_analyzed",
          "type": "string"
        },
        "pid": {
          "type": "integer"
        },
        "addr": {
          "index": "not_analyzed",
          "type": "string"
        },
        "age": {
          "type": "integer"
        }
      }
    }
  }
}

3、导入程序模块

3.1 流程图
导入模块

整个导入模块的流程图如上,Main创建ESClientBulkProcessor;读取Person.dat中的Json串,组成UpdateRequest后加入到BulkProcessor中,当BulkProcessor满足一定的写入条件后,会批量进行发送到ES集群。

3.2 ESClient建立

添加Maven依赖:

<dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.3.4</version>
</dependency>
// ESConfig
public class ESConfig {
    private String esClusterName;    // 集群名称
    private String esClusterAddress; // 集群地址
    private String esIndex;          // ES库
    private String esType;           // ES表
    private int batchSize;           // 批量导入大小
    private String filePath;         // 导入文件的路径
    private int esThreadNum;         // 导入到ES的并发数量
    private String localClientIP;    // 本机IP地址

    public String getEsClusterName() {
        return esClusterName;
    }

    public ESConfig setEsClusterName(String esClusterName) {
        this.esClusterName = esClusterName;
        return this;
    }

    public String getEsClusterAddress() {
        return esClusterAddress;
    }

    public ESConfig setEsClusterAddress(String esClusterAddress) {
        this.esClusterAddress = esClusterAddress;
        return this;
    }

    public String getEsIndex() {
        return esIndex;
    }

    public ESConfig setEsIndex(String esIndex) {
        this.esIndex = esIndex;
        return this;
    }

    public String getEsType() {
        return esType;
    }

    public ESConfig setEsType(String esType) {
        this.esType = esType;
        return this;
    }

    public int getBatchSize() {
        return batchSize;
    }

    public ESConfig setBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    public String getFilePath() {
        return filePath;
    }

    public ESConfig setFilePath(String filePath) {
        this.filePath = filePath;
        return this;
    }

    public int getEsThreadNum() {
        return esThreadNum;
    }

    public ESConfig setEsThreadNum(int esThreadNum) {
        this.esThreadNum = esThreadNum;
        return this;
    }

    public String getLocalClientIP() {
        return localClientIP;
    }

    public ESConfig setLocalClientIP(String localClientIP) {
        this.localClientIP = localClientIP;
        return this;
    }
}

ESClient:

public class ESClient {
    private static final Logger logger = LogManager.getLogger(ESClient.class);

    public BulkProcessor createBulkProcessor(ESConfig esConfig) {
        String clusterName = esConfig.getEsClusterName();
        String clusterAddr = esConfig.getEsClusterAddress();

        if (clusterName == null || clusterName.isEmpty()) {
            logger.error("invalid cluster name.");
            return null;
        }
        if (clusterAddr == null || clusterAddr.isEmpty()) {
            logger.info("invalid cluster address.");
            return null;
        }
        String[] addr = clusterAddr.split(":");
        if (addr.length != 2) {
            logger.info("invalid cluster address.");
            return null;
        }
        Settings settings = Settings.settingsBuilder()
                .put("cluster.name", clusterName)
                .put("cluster.transport.sniff", true)
                .put("index.refresh_interval", "60s")
                .build();
        // 创建 TransportClient
        TransportClient transportClient = new TransportClient.Builder()
                .settings(settings).build();

        List<InetSocketTransportAddress> addrList = new ArrayList<>();
        try {
            addrList.add(new InetSocketTransportAddress(InetAddress.getByName(addr[0]),
                    Integer.parseInt(addr[1])));
        } catch (Exception e) {
            logger.error("exception:", e);
            return null;
        }

        for (InetSocketTransportAddress address : addrList) {
            transportClient.addTransportAddress(address);
        }
        Client client = transportClient;

        // 初始化Bulk处理器
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    long begin;
                    long cost;
                    int count = 0;

                    @Override
                    public void beforeBulk(long executionId, BulkRequest bulkRequest) {
                        begin = System.currentTimeMillis();
                    }

                    @Override
                    public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                        cost = (System.currentTimeMillis() - begin) / 1000;
                        count += bulkRequest.numberOfActions();
                        logger.info("bulk success. size:[{}] cost:[{}s]", count, cost);
                    }

                    @Override
                    public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
                        logger.error("bulk update has failures, will retry:" + throwable);
                    }
                })
                .setBulkActions(esConfig.getBatchSize())                    // 批量导入个数
                .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB))    // 满1MB进行导入
                .setConcurrentRequests(esConfig.getEsThreadNum())           // 并发数
                .setFlushInterval(TimeValue.timeValueSeconds(5))            // 冲刷间隔60s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) // 重试3次,间隔1s
                .build();
        return bulkProcessor;
    }
}

在3.1节中,我们曾提到过满足发送条件这个概念,对应于上面BulkProcessor中的3个set方法。分别是:

  • 当导入数据(UpdateRequest)的个数达到后,进行发送;
  • 当导入数据的大小达到1MB后,进行发送;
  • 当距离上一次发送超过60秒时,进行发送。
3.3 读取并组装UpdateRequest

ESImporter:

public class ESImporter {
    private static final Logger logger = LogManager.getLogger(ESImporter.class);
    
    public void importer(ESConfig esConfig) {

        File file = new File(esConfig.getFilePath());
        BufferedReader reader = null;
        // 创建BulkProcessor
        BulkProcessor bulkProcessor = new ESClient().createBulkProcessor(esConfig);
        if (bulkProcessor == null) {
            logger.error("create bulk processor failed.");
            return;
        }
        UpdateRequest updateRequest;
        String[] arrStr;
        try {
            reader = new BufferedReader(new FileReader(file));
            String tempString;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                arrStr = tempString.split("\t");
                if (arrStr.length != 2) {
                    continue;
                }
                updateRequest = new UpdateRequest(esConfig.getEsIndex(), esConfig.getEsType(), arrStr[0])
                        .doc(arrStr[1]).docAsUpsert(true);
                bulkProcessor.add(updateRequest);
            }
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (reader != null) {
                    reader.close();
                }
                if (bulkProcessor != null) {
                    bulkProcessor.awaitClose(1, TimeUnit.MINUTES);
                }
            } catch (Exception e) {
                // do nothing
            }
        }
    }
}

这个模块主要是读取文件中的Json行,组装成UpdateRequest后,加入到bulkProcessor中。

3.4 服务启动模块

ImportMain:

public class ImportMain {
    private static final Logger logger = LogManager.getLogger(ImportMain.class);

    public static void main(String[] args) {
        try {
            if (args.length < 1) {
                System.err.println("usage: <file_path>");
                System.exit(1);
            }
            ESConfig esConfig = new ESConfig()
                    .setEsClusterName("elasticsearch")
                    .setEsClusterAddress("127.0.0.1:9300")
                    .setEsIndex("person")
                    .setEsType("infos")
                    .setBatchSize(100)
                    .setFilePath(args[0])
                    .setEsThreadNum(1);
           
            long begin = System.currentTimeMillis();
            ESImporter esImporter = new ESImporter();
            esImporter.importer(esConfig);
            long cost = System.currentTimeMillis() - begin;
            logger.info("import end. cost:[{}ms]", cost);
        } catch (Exception e) {
            logger.error("exception:", e);
        }
    }
}
3.5 代码目录
代码目录
3.6 ES集群查看

导入结束后,在ES集群上可以看到导入的docs。


docs
data
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容