📋 方案概述
背景:原索引5亿数据,需要每月生成一个新索引,新增数据插入新索引,查询需要查询所有索引。
核心目标:业务代码零改动,使用别名实现透明化管理。
🎯 架构设计
索引结构
原索引:voice-data (5亿数据)
↓ 迁移
历史索引:voice-data-history (5亿数据,只读)
月度索引:voice-data-2025-11, voice-data-2025-12, ...
别名:voice-data (业务代码使用的名称)
├─ voice-data-history (读)
├─ voice-data-2025-11 (读)
└─ voice-data-2025-12 (读+写✓)
数据流向
┌──────────────┐
│ 业务应用层 │
│ 使用: voice-data │
└───────┬──────┘
│ (别名透明路由)
↓
┌─────────────────────────┐
│ Elasticsearch 集群 │
│ │
│ 写入 → voice-data-2025-12 (is_write_index=true) │
│ │
│ 查询 → voice-data-history │
│ → voice-data-2025-11 │
│ → voice-data-2025-12 │
└─────────────────────────┘
🔧 实施步骤
阶段一:准备工作(业务无影响)
1. 创建首个月度索引
PUT /voice-data-2025-11
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "30s"
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"timestamp": { "type": "date" },
"content": { "type": "text" },
"status": { "type": "keyword" }
}
}
}
2. 克隆原索引为历史索引
# 方式A:使用 clone API(推荐,速度快)
POST /voice-data/_clone/voice-data-history
# 方式B:使用 reindex(适合需要数据过滤的场景)
POST /_reindex
{
"source": { "index": "voice-data" },
"dest": { "index": "voice-data-history" }
}
3. 删除原索引,创建别名
# 删除原索引
DELETE /voice-data
# 创建别名配置
POST /_aliases
{
"actions": [
{
"add": {
"index": "voice-data-history",
"alias": "voice-data",
"is_write_index": false
}
},
{
"add": {
"index": "voice-data-2025-11",
"alias": "voice-data",
"is_write_index": true
}
}
]
}
4. 验证别名配置
# 查看别名
GET /_cat/aliases/voice-data?v
# 预期输出:
# alias index is_write_index
# voice-data voice-data-history -
# voice-data voice-data-2025-11 true
阶段二:业务代码验证(无需修改)
写入测试
// ✅ 业务代码保持不变,仍使用 voice-data
IndexRequest request = new IndexRequest("voice-data")
.source(XContentType.JSON,
"timestamp", new Date(),
"content", "测试数据");
client.index(request, RequestOptions.DEFAULT);
// ES自动路由到 voice-data-2025-11(因为 is_write_index=true)
查询测试
// ✅ 业务代码保持不变
SearchRequest request = new SearchRequest("voice-data");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// ES自动查询所有关联索引:
// - voice-data-history
// - voice-data-2025-11
💻 Java 自动化实现
1. Maven 依赖
<dependencies>
<!-- Elasticsearch Java Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.10</version>
</dependency>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.14</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
2. Elasticsearch 配置类
package com.example.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.port:9200}")
private int port;
@Value("${elasticsearch.scheme:http}")
private String scheme;
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, scheme))
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000))
);
}
}
3. 索引管理服务类
package com.example.es.service;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Set;
@Slf4j
@Service
public class IndexManagementService {
@Autowired
private RestHighLevelClient client;
@Value("${elasticsearch.index.prefix:voice-data}")
private String indexPrefix;
@Value("${elasticsearch.index.alias:voice-data}")
private String indexAlias;
@Value("${elasticsearch.index.shards:5}")
private int numberOfShards;
@Value("${elasticsearch.index.replicas:1}")
private int numberOfReplicas;
/**
* 创建月度索引
*/
public boolean createMonthlyIndex() {
String indexName = getCurrentMonthIndexName();
try {
// 检查索引是否已存在
if (indexExists(indexName)) {
log.warn("索引 {} 已存在,跳过创建", indexName);
return false;
}
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indexName);
// 设置索引配置
request.settings(Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", "30s")
.put("index.max_result_window", 10000)
);
// 设置映射
request.mapping(createMapping());
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("✅ 索引 {} 创建成功", indexName);
return true;
} else {
log.error("❌ 索引 {} 创建失败", indexName);
return false;
}
} catch (IOException e) {
log.error("创建索引 {} 时发生异常", indexName, e);
return false;
}
}
/**
* 更新别名配置
*/
public boolean updateAliasForNewMonth() {
String newIndex = getCurrentMonthIndexName();
String lastMonthIndex = getLastMonthIndexName();
try {
IndicesAliasesRequest request = new IndicesAliasesRequest();
// 1. 如果上月索引存在,移除其写入标记
if (indexExists(lastMonthIndex)) {
request.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(lastMonthIndex)
.alias(indexAlias)
.writeIndex(false)
);
log.info("移除上月索引 {} 的写入标记", lastMonthIndex);
}
// 2. 将新索引添加到别名,并设置为写入索引
request.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(newIndex)
.alias(indexAlias)
.writeIndex(true)
);
AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("✅ 别名 {} 已更新,写入索引切换到 {}", indexAlias, newIndex);
return true;
} else {
log.error("❌ 别名更新失败");
return false;
}
} catch (IOException e) {
log.error("更新别名时发生异常", e);
return false;
}
}
/**
* 执行月度索引切换(创建 + 更新别名)
*/
public boolean performMonthlyRotation() {
log.info("========== 开始执行月度索引切换 ==========");
// 1. 创建新月度索引
boolean indexCreated = createMonthlyIndex();
if (!indexCreated) {
log.error("月度索引创建失败,中止切换流程");
return false;
}
// 2. 更新别名配置
boolean aliasUpdated = updateAliasForNewMonth();
if (!aliasUpdated) {
log.error("别名更新失败,但新索引已创建");
return false;
}
// 3. 可选:将上月索引设置为只读
String lastMonthIndex = getLastMonthIndexName();
if (indexExists(lastMonthIndex)) {
setIndexReadOnly(lastMonthIndex);
}
log.info("========== 月度索引切换完成 ==========");
return true;
}
/**
* 查看当前别名配置
*/
public void printAliasInfo() {
try {
GetAliasesRequest request = new GetAliasesRequest(indexAlias);
GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetadata>> aliases = response.getAliases();
log.info("========== 别名 {} 当前配置 ==========", indexAlias);
aliases.forEach((indexName, aliasMetadataSet) -> {
aliasMetadataSet.forEach(aliasMetadata -> {
Boolean isWriteIndex = aliasMetadata.writeIndex();
log.info("索引: {}, 别名: {}, 写入索引: {}",
indexName, aliasMetadata.alias(),
isWriteIndex != null && isWriteIndex ? "✓" : "-");
});
});
log.info("==========================================");
} catch (IOException e) {
log.error("查询别名信息时发生异常", e);
}
}
/**
* 设置索引为只读
*/
private void setIndexReadOnly(String indexName) {
try {
org.elasticsearch.client.indices.PutSettingsRequest request =
new org.elasticsearch.client.indices.PutSettingsRequest(indexName);
request.settings(Settings.builder()
.put("index.blocks.write", true)
.build());
AcknowledgedResponse response = client.indices().putSettings(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("✅ 索引 {} 已设置为只读", indexName);
}
} catch (IOException e) {
log.error("设置索引 {} 只读时发生异常", indexName, e);
}
}
/**
* 检查索引是否存在
*/
private boolean indexExists(String indexName) {
try {
GetIndexRequest request = new GetIndexRequest(indexName);
return client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("检查索引 {} 是否存在时发生异常", indexName, e);
return false;
}
}
/**
* 创建索引映射
*/
private XContentBuilder createMapping() throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("id")
.field("type", "keyword")
.endObject()
.startObject("timestamp")
.field("type", "date")
.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
.endObject()
.startObject("content")
.field("type", "text")
.field("analyzer", "ik_max_word")
.field("search_analyzer", "ik_smart")
.endObject()
.startObject("status")
.field("type", "keyword")
.endObject()
.startObject("userId")
.field("type", "keyword")
.endObject()
.startObject("createTime")
.field("type", "date")
.endObject()
.endObject()
.endObject();
}
/**
* 获取当前月份的索引名称
*/
private String getCurrentMonthIndexName() {
String yearMonth = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM"));
return indexPrefix + "-" + yearMonth;
}
/**
* 获取上个月的索引名称
*/
private String getLastMonthIndexName() {
String yearMonth = LocalDate.now().minusMonths(1).format(DateTimeFormatter.ofPattern("yyyy-MM"));
return indexPrefix + "-" + yearMonth;
}
}
4. 定时任务调度类
package com.example.es.scheduler;
import com.example.es.service.IndexManagementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Slf4j
@Component
public class IndexRotationScheduler {
@Autowired
private IndexManagementService indexManagementService;
/**
* 每月1号凌晨00:05执行索引切换
* cron表达式:秒 分 时 日 月 周
*/
@Scheduled(cron = "0 5 0 1 * ?")
public void monthlyIndexRotation() {
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
log.info("========================================");
log.info("⏰ 定时任务触发时间: {}", timestamp);
log.info("========================================");
try {
// 执行月度索引切换
boolean success = indexManagementService.performMonthlyRotation();
if (success) {
log.info("🎉 月度索引切换成功");
// 打印当前别名配置
indexManagementService.printAliasInfo();
// 可选:发送成功通知
sendSuccessNotification();
} else {
log.error("❌ 月度索引切换失败");
// 发送告警通知
sendAlertNotification("月度索引切换失败");
}
} catch (Exception e) {
log.error("❌ 月度索引切换过程中发生异常", e);
sendAlertNotification("月度索引切换异常: " + e.getMessage());
}
}
/**
* 每天凌晨检查索引状态(可选)
*/
@Scheduled(cron = "0 0 1 * * ?")
public void dailyIndexHealthCheck() {
log.info("执行每日索引健康检查...");
indexManagementService.printAliasInfo();
}
/**
* 发送成功通知
*/
private void sendSuccessNotification() {
// TODO: 实现通知逻辑(邮件、钉钉、企业微信等)
log.info("📧 发送成功通知");
}
/**
* 发送告警通知
*/
private void sendAlertNotification(String message) {
// TODO: 实现告警逻辑
log.error("🚨 发送告警通知: {}", message);
}
}
5. 手动触发接口(便于测试)
package com.example.es.controller;
import com.example.es.service.IndexManagementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/api/index")
public class IndexManagementController {
@Autowired
private IndexManagementService indexManagementService;
/**
* 手动触发月度索引切换
*/
@PostMapping("/rotate")
public Map<String, Object> manualRotation() {
Map<String, Object> result = new HashMap<>();
try {
log.info("收到手动触发索引切换请求");
boolean success = indexManagementService.performMonthlyRotation();
result.put("success", success);
result.put("message", success ? "索引切换成功" : "索引切换失败");
return result;
} catch (Exception e) {
log.error("手动索引切换失败", e);
result.put("success", false);
result.put("message", "索引切换异常: " + e.getMessage());
return result;
}
}
/**
* 创建月度索引(仅创建,不切换别名)
*/
@PostMapping("/create")
public Map<String, Object> createMonthlyIndex() {
Map<String, Object> result = new HashMap<>();
boolean success = indexManagementService.createMonthlyIndex();
result.put("success", success);
result.put("message", success ? "索引创建成功" : "索引创建失败");
return result;
}
/**
* 查看当前别名配置
*/
@GetMapping("/alias/info")
public Map<String, Object> getAliasInfo() {
Map<String, Object> result = new HashMap<>();
indexManagementService.printAliasInfo();
result.put("success", true);
result.put("message", "请查看日志输出");
return result;
}
/**
* 健康检查
*/
@GetMapping("/health")
public Map<String, Object> healthCheck() {
Map<String, Object> result = new HashMap<>();
result.put("status", "ok");
result.put("timestamp", System.currentTimeMillis());
return result;
}
}
6. 配置文件
# application.yml
server:
port: 8080
spring:
application:
name: elasticsearch-index-manager
# 启用定时任务
task:
scheduling:
enabled: true
# Elasticsearch配置
elasticsearch:
host: localhost
port: 9200
scheme: http
# 索引配置
index:
prefix: voice-data # 索引前缀
alias: voice-data # 别名(业务使用的名称)
shards: 5 # 分片数
replicas: 1 # 副本数
# 日志配置
logging:
level:
com.example.es: INFO
org.elasticsearch: WARN
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7. 启动类
package com.example.es;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class ElasticsearchIndexManagerApplication {
public static void main(String[] args) {
SpringApplication.run(ElasticsearchIndexManagerApplication.class, args);
System.out.println("======================================");
System.out.println(" Elasticsearch 索引管理服务已启动");
System.out.println(" 定时任务:每月1号 00:05 自动切换索引");
System.out.println("======================================");
}
}
8. 测试类
package com.example.es;
import com.example.es.service.IndexManagementService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class IndexManagementTest {
@Autowired
private IndexManagementService indexManagementService;
@Test
public void testCreateMonthlyIndex() {
boolean result = indexManagementService.createMonthlyIndex();
System.out.println("创建索引结果: " + (result ? "成功" : "失败"));
}
@Test
public void testUpdateAlias() {
boolean result = indexManagementService.updateAliasForNewMonth();
System.out.println("更新别名结果: " + (result ? "成功" : "失败"));
}
@Test
public void testPerformRotation() {
boolean result = indexManagementService.performMonthlyRotation();
System.out.println("索引切换结果: " + (result ? "成功" : "失败"));
}
@Test
public void testPrintAliasInfo() {
indexManagementService.printAliasInfo();
}
}
📊 运行效果演示
启动日志
2025-11-12 10:00:00 [main] INFO c.e.es.ElasticsearchIndexManagerApplication - Starting...
2025-11-12 10:00:02 [main] INFO o.e.c.RestHighLevelClient - Elasticsearch客户端初始化成功
======================================
Elasticsearch 索引管理服务已启动
定时任务:每月1号 00:05 自动切换索引
======================================
定时任务执行日志
========================================
⏰ 定时任务触发时间: 2025-12-01 00:05:00
========================================
========== 开始执行月度索引切换 ==========
✅ 索引 voice-data-2025-12 创建成功
移除上月索引 voice-data-2025-11 的写入标记
✅ 别名 voice-data 已更新,写入索引切换到 voice-data-2025-12
✅ 索引 voice-data-2025-11 已设置为只读
========== 月度索引切换完成 ==========
🎉 月度索引切换成功
========== 别名 voice-data 当前配置 ==========
索引: voice-data-history, 别名: voice-data, 写入索引: -
索引: voice-data-2025-11, 别名: voice-data, 写入索引: -
索引: voice-data-2025-12, 别名: voice-data, 写入索引: ✓
==========================================
📧 发送成功通知
手动触发测试
# 测试创建索引
curl -X POST http://localhost:8080/api/index/create
# 响应:
{
"success": true,
"message": "索引创建成功"
}
# 测试完整切换
curl -X POST http://localhost:8080/api/index/rotate
# 响应:
{
"success": true,
"message": "索引切换成功"
}
# 查看别名信息
curl http://localhost:8080/api/index/alias/info
# 响应:
{
"success": true,
"message": "请查看日志输出"
}
🔍 业务代码示例(无需修改)
数据写入
@Service
public class VoiceDataService {
@Autowired
private RestHighLevelClient client;
// ✅ 业务代码完全不用改,仍然使用 voice-data
public void saveVoiceData(VoiceData data) throws IOException {
IndexRequest request = new IndexRequest("voice-data") // 使用别名
.id(data.getId())
.source(XContentType.JSON,
"timestamp", data.getTimestamp(),
"content", data.getContent(),
"status", data.getStatus());
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// ES自动路由到当前写入索引(voice-data-2025-12)
}
}
数据查询
@Service
public class VoiceSearchService {
@Autowired
private RestHighLevelClient client;
// ✅ 查询代码也不用改
public List<VoiceData> searchVoiceData(String keyword) throws IOException {
SearchRequest request = new SearchRequest("voice-data"); // 使用别名
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchQuery("content", keyword));
builder.size(100);
builder.sort("timestamp", SortOrder.DESC);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// ES自动查询所有关联索引:
// - voice-data-history
// - voice-data-2025-11
// - voice-data-2025-12
// 解析结果...
return parseSearchHits(response.getHits());
}
}
聚合统计
@Service
public class VoiceStatisticsService {
@Autowired
private RestHighLevelClient client;
// ✅ 聚合统计也无需修改
public Map<String, Long> getMonthlyStatistics() throws IOException {
SearchRequest request = new SearchRequest("voice-data"); // 使用别名
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.size(0);
builder.aggregation(
AggregationBuilders.dateHistogram("monthly")
.field("timestamp")
.calendarInterval(DateHistogramInterval.MONTH)
);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// ES自动在所有索引上执行聚合并合并结果
// 解析聚合结果...
return parseAggregation(response.getAggregations());
}
}