storm如何把数据插入到elasticsearch
1 storm提供的例子
https://github.com/apache/storm/tree/master/external/storm-elasticsearch
代码:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.1.0</version>
</dependency>
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
问题:依赖低版本的Elasticsearch 这个问题没有解决
查看最新代码已经修复了 没提供jar包
需要重新编译storm1.10代码 直接放弃了 采用下面方法
方法2 elasticsearch-hadoop
疑问?
仅仅支持hadoop吗 storm支持吗我要的是storm?
ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,我们既可以把HDFS的数据导入到ES里面做分析,也可以将es数据导出到HDFS上做备份,归档,其中值得一提的是ES-Hadoop全面的支持了Spark框架,
其中包括Spark(五角星那个上面中间位置)
- 支持Hive(像蜜蜂的那个下面最左位置)
- 支持Cascading(有五个竖线那个 上面最右位置)
Cascading is the proven application development platform for
building data applications on Hadoop. - Storm(闪电的那个)
- 当然还有标准的MapReduce,
无论用那一个框架集成ES,都是非常简洁的。
疑问:
为了使用这个jar 是否引用一系列相关的jar呀
经过验证不需要引入hadoop 但是json和http引入
折腾不起
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
<version>1.8.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.0</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>5.5.1</version>
</dependency>
-->
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.5.1</version>
</dependency>
代码实现
配置文件:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>
方法3 elasticsearch 官方提供的例子
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>5.5.1</version>
</dependency>
阅读代码:
关键类:TransportClient
Elasticsearch uses standard RESTful APIs and JSON.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
SearchResponse sr = client.prepareSearch()
.setQuery(QueryBuilders.matchQuery("message", "myProduct"))
.addAggregation(AggregationBuilders.terms("top_10_states")
.field("state").size(10))
.execute().actionGet();
client.close();
es Elasticsearch from Storm
http://blog.csdn.net/sunnyyoona/article/details/52860861
https://www.elastic.co/guide/en/elasticsearch/guide/current/dynamic-mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/dynamic-field-mapping.html#date-detection
参考
Elasticsearch for Apache Hadoop
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.htmlelasticsearch-storm
https://github.com/swapnilkumbhar1602/Storm_to_ElasticSearch_Kibanastorm-elasticsearch
https://github.com/apache/storm/tree/master/external/storm-elasticsearch
-Mapping and Types
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html#_multi_fields_2
http://www.jianshu.com/p/ab99d2bcd63d
http://blog.csdn.net/sunnyyoona/article/details/52860861
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/ip.html时间类型
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/date.html
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/mapping-date-format.html#strict-date-time
http://blog.csdn.net/macavalier/article/details/17632491
https://stackoverflow.com/questions/29938237/java-8-date-and-time-api-parse-yyyy-mm-ddthhmmss-sssz
https://www.w3.org/TR/NOTE-datetime
storm连接kafka
//重点
Storm 如何来封装kafka接口
class:DynamicPartitionConnections