Flink sink数据到带有账号密码的ES

话不多说直接上代码

/*********************************** 写数据到ElasticSearch ***************************************/

//从配置文件中获取es的地址

ListhttpHosts =new ArrayList<>();

httpHosts.add(new HttpHost(parameterTool.getRequired("ELASTICSEARCH_HOST"),9200,"http"));

/*//从配置文件中读取 bulk flush size,代表一次批处理的数量int bulkSize = parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS", 40);*/

// 创建elasticsearch Sink

ElasticsearchSink.BuilderesSinkBuilder =new ElasticsearchSink.Builder<>(

httpHosts,

new ElasticsearchSinkFunction() {

public IndexRequest createIndexRequest(String element) {

Mapjson =new HashMap<>();

String line =element.substring(element.split("\\|\\^\\|")[0].length() +3);

KafkaEvent kafkaEvent =new KafkaEvent().fromString(line);

json.put("dts_id",kafkaEvent.getDid().toString());

json.put("business_time",kafkaEvent.getBt());

json.put("protocol_id",kafkaEvent.getPi());

json.put("user_view_status",kafkaEvent.getUvs().longValue());

return Requests.indexRequest()

.id(element.split("\\|\\^\\|")[0])

.index(parameterTool.getRequired("ES_INDEX_NAME"))

.type(parameterTool.getRequired("ES_INDEX_TYPE"))//ES_INDEX_TYPE

                        .source(json);

}

@Override

            public void process(String element,RuntimeContext ctx,RequestIndexer indexer) {

indexer.add(createIndexRequest(element));

}

}

);

//批处理最大数

esSinkBuilder.setBulkFlushMaxActions(parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS",40));

esSinkBuilder.setRestClientFactory(

new RestClientFactory() {

@Override

            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

final CredentialsProvider credentialsProvider =new BasicCredentialsProvider();

credentialsProvider.setCredentials(AuthScope.ANY,

new UsernamePasswordCredentials(parameterTool.getRequired("ELASTICSEARCH_NAME"),

parameterTool.getRequired("ELASTICSEARCH_PASSWD")));

restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {//设置自定义http客户端配置

                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

httpClientBuilder.disableAuthCaching();

return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

}

})/*.setMaxRetryTimeoutMillis(2000)*/;

}

}

);

//将去重后的数据写入到ElasticSearch中

resulted.addSink(esSinkBuilder.build());

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • !/usr/bin/env python -- coding: utf-8 -- '''根据某个traceId去调...
    小七奇奇阅读 639评论 0 0
  • 此文是关于elasticsearch in action书部分重点读书笔记。 Chapter 2 Diving i...
    shamumu阅读 2,587评论 0 1
  • 平台内的产品有一个数据分析,统计平台内某个商户某个时间段内(今天、昨天、7天内、30天内……)的各种数据分析,这种...
    Chting阅读 644评论 1 3
  • 英文文档,一开始我也是抗拒的,边翻译边看,也就花费了1个小时基本就阅读过了,我的英文基础其实很差。附上链接:链接:...
    lonecolonel阅读 10,031评论 3 1
  • 1 注意import的StreamExecutionEnvironment // java 的头是 import ...
    君剑阅读 9,312评论 3 3