import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@Service
@Slf4j
public class AddEs {
@Autowired
private Environment env;
public String INDEX = "qppeople";
public TransportClient client;
public void ass() {
List<Map<String, String>> rows = FileUtils.poiExcelAll(new File("properties/qppeople.xlsx"));
//创建链接
addTransportClient();
//先创建索引
CreateIndexRequest request = new CreateIndexRequest(INDEX);
client.admin().indices().create(request);
while (true){
if(isExistsIndex(INDEX)){
break;
}
}
//创建mapping
PutMappingRequest mapping = Requests.putMappingRequest(INDEX).type(INDEX).source(getMapping(rows.get(0)));
client.admin().indices().putMapping(mapping).actionGet();
//插入数据
addData(rows);
}
/**
* 判断索引是否存在
* @param indexName
* @return
*/
public boolean isExistsIndex(String indexName) {
IndicesExistsResponse response = client.admin().indices().exists(
new IndicesExistsRequest().indices(new String[]{indexName})).actionGet();
return response.isExists();
}
/**
* 删除
* @param indexName
*/
public void deleteIndex(String indexName) {
DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(indexName) .execute().actionGet();
if (dResponse.isAcknowledged()) {
System.out.println("delete index "+indexName+" successfully!");
}else{
System.out.println("Fail to delete index "+indexName);
}
}
/**
* 创建
*/
public void addTransportClient() {
Settings settings = Settings.builder()
.put("cluster.name", env.getProperty("es.cluster-name"))
.put("client.transport.sniff", true)
.build();
try {
TransportAddress transportAddress = new TransportAddress(
InetAddress.getByName(env.getProperty("es.ip")),
Integer.valueOf(env.getProperty("es.port")));
client = new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
/**
*生成mapping
* @param keyMap
* @return
*/
public static XContentBuilder getMapping(Map<String,String> keyMap){
XContentBuilder mapping = null;
try {
mapping = jsonBuilder().startObject().startObject("properties");
for(String key : keyMap.keySet()) {
mapping.startObject(key).field("type","keyword").endObject();
}
mapping.endObject().endObject();
} catch (IOException e) {
e.printStackTrace();
}
return mapping;
}
/**
* 新增
* @param rows
*/
public void addData(List<Map<String,String>> rows){
BulkRequestBuilder bulkRequest = client.prepareBulk();
int i=0;
for( Map<String, String> dataMap : rows){
i++;
bulkRequest.add(client.prepareIndex(INDEX,INDEX).setSource(dataMap));
// 每10000条提交一次
if (i % 1000 == 0) {
bulkRequest.execute().actionGet();
bulkRequest = client.prepareBulk();
System.out.println("----------------------入es");
}
}
bulkRequest.execute().actionGet();
}
}
Java-ES
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...