Elasticsearch 按月分片索引管理方案

📋 方案概述

背景:原索引5亿数据,需要每月生成一个新索引,新增数据插入新索引,查询需要查询所有索引。

核心目标:业务代码零改动,使用别名实现透明化管理。


🎯 架构设计

索引结构

原索引:voice-data (5亿数据)
         ↓ 迁移
历史索引:voice-data-history (5亿数据,只读)
月度索引:voice-data-2025-11, voice-data-2025-12, ...

别名:voice-data (业务代码使用的名称)
      ├─ voice-data-history (读)
      ├─ voice-data-2025-11 (读)
      └─ voice-data-2025-12 (读+写✓)

数据流向

┌──────────────┐
│  业务应用层   │
│ 使用: voice-data │
└───────┬──────┘
        │ (别名透明路由)
        ↓
┌─────────────────────────┐
│   Elasticsearch 集群    │
│                         │
│  写入 → voice-data-2025-12 (is_write_index=true) │
│                         │
│  查询 → voice-data-history    │
│       → voice-data-2025-11    │
│       → voice-data-2025-12    │
└─────────────────────────┘

🔧 实施步骤

阶段一:准备工作(业务无影响)

1. 创建首个月度索引

PUT /voice-data-2025-11
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1,
    "refresh_interval": "30s"
  },
  "mappings": {
    "properties": {
      "id": { "type": "keyword" },
      "timestamp": { "type": "date" },
      "content": { "type": "text" },
      "status": { "type": "keyword" }
    }
  }
}

2. 克隆原索引为历史索引

# 方式A:使用 clone API(推荐,速度快)
POST /voice-data/_clone/voice-data-history

# 方式B:使用 reindex(适合需要数据过滤的场景)
POST /_reindex
{
  "source": { "index": "voice-data" },
  "dest": { "index": "voice-data-history" }
}

3. 删除原索引,创建别名

# 删除原索引
DELETE /voice-data

# 创建别名配置
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "voice-data-history",
        "alias": "voice-data",
        "is_write_index": false
      }
    },
    {
      "add": {
        "index": "voice-data-2025-11",
        "alias": "voice-data",
        "is_write_index": true
      }
    }
  ]
}

4. 验证别名配置

# 查看别名
GET /_cat/aliases/voice-data?v

# 预期输出:
# alias       index                   is_write_index
# voice-data  voice-data-history      -
# voice-data  voice-data-2025-11      true

阶段二:业务代码验证(无需修改)

写入测试

// ✅ 业务代码保持不变,仍使用 voice-data
IndexRequest request = new IndexRequest("voice-data")
    .source(XContentType.JSON,
        "timestamp", new Date(),
        "content", "测试数据");
client.index(request, RequestOptions.DEFAULT);

// ES自动路由到 voice-data-2025-11(因为 is_write_index=true)

查询测试

// ✅ 业务代码保持不变
SearchRequest request = new SearchRequest("voice-data");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
request.source(builder);

SearchResponse response = client.search(request, RequestOptions.DEFAULT);

// ES自动查询所有关联索引:
// - voice-data-history
// - voice-data-2025-11

💻 Java 自动化实现

1. Maven 依赖

<dependencies>
    <!-- Elasticsearch Java Client -->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.17.10</version>
    </dependency>
    
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.7.14</version>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.28</version>
    </dependency>
    
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>
</dependencies>

2. Elasticsearch 配置类

package com.example.es.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.host:localhost}")
    private String host;

    @Value("${elasticsearch.port:9200}")
    private int port;

    @Value("${elasticsearch.scheme:http}")
    private String scheme;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(
            RestClient.builder(new HttpHost(host, port, scheme))
                .setRequestConfigCallback(requestConfigBuilder ->
                    requestConfigBuilder
                        .setConnectTimeout(5000)
                        .setSocketTimeout(60000))
        );
    }
}

3. 索引管理服务类

package com.example.es.service;

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Set;

@Slf4j
@Service
public class IndexManagementService {

    @Autowired
    private RestHighLevelClient client;

    @Value("${elasticsearch.index.prefix:voice-data}")
    private String indexPrefix;

    @Value("${elasticsearch.index.alias:voice-data}")
    private String indexAlias;

    @Value("${elasticsearch.index.shards:5}")
    private int numberOfShards;

    @Value("${elasticsearch.index.replicas:1}")
    private int numberOfReplicas;

    /**
     * 创建月度索引
     */
    public boolean createMonthlyIndex() {
        String indexName = getCurrentMonthIndexName();
        
        try {
            // 检查索引是否已存在
            if (indexExists(indexName)) {
                log.warn("索引 {} 已存在,跳过创建", indexName);
                return false;
            }

            // 创建索引
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            
            // 设置索引配置
            request.settings(Settings.builder()
                .put("index.number_of_shards", numberOfShards)
                .put("index.number_of_replicas", numberOfReplicas)
                .put("index.refresh_interval", "30s")
                .put("index.max_result_window", 10000)
            );

            // 设置映射
            request.mapping(createMapping());

            CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
            
            if (response.isAcknowledged()) {
                log.info("✅ 索引 {} 创建成功", indexName);
                return true;
            } else {
                log.error("❌ 索引 {} 创建失败", indexName);
                return false;
            }

        } catch (IOException e) {
            log.error("创建索引 {} 时发生异常", indexName, e);
            return false;
        }
    }

    /**
     * 更新别名配置
     */
    public boolean updateAliasForNewMonth() {
        String newIndex = getCurrentMonthIndexName();
        String lastMonthIndex = getLastMonthIndexName();

        try {
            IndicesAliasesRequest request = new IndicesAliasesRequest();

            // 1. 如果上月索引存在,移除其写入标记
            if (indexExists(lastMonthIndex)) {
                request.addAliasAction(
                    IndicesAliasesRequest.AliasActions.add()
                        .index(lastMonthIndex)
                        .alias(indexAlias)
                        .writeIndex(false)
                );
                log.info("移除上月索引 {} 的写入标记", lastMonthIndex);
            }

            // 2. 将新索引添加到别名,并设置为写入索引
            request.addAliasAction(
                IndicesAliasesRequest.AliasActions.add()
                    .index(newIndex)
                    .alias(indexAlias)
                    .writeIndex(true)
            );

            AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT);

            if (response.isAcknowledged()) {
                log.info("✅ 别名 {} 已更新,写入索引切换到 {}", indexAlias, newIndex);
                return true;
            } else {
                log.error("❌ 别名更新失败");
                return false;
            }

        } catch (IOException e) {
            log.error("更新别名时发生异常", e);
            return false;
        }
    }

    /**
     * 执行月度索引切换(创建 + 更新别名)
     */
    public boolean performMonthlyRotation() {
        log.info("========== 开始执行月度索引切换 ==========");
        
        // 1. 创建新月度索引
        boolean indexCreated = createMonthlyIndex();
        if (!indexCreated) {
            log.error("月度索引创建失败,中止切换流程");
            return false;
        }

        // 2. 更新别名配置
        boolean aliasUpdated = updateAliasForNewMonth();
        if (!aliasUpdated) {
            log.error("别名更新失败,但新索引已创建");
            return false;
        }

        // 3. 可选:将上月索引设置为只读
        String lastMonthIndex = getLastMonthIndexName();
        if (indexExists(lastMonthIndex)) {
            setIndexReadOnly(lastMonthIndex);
        }

        log.info("========== 月度索引切换完成 ==========");
        return true;
    }

    /**
     * 查看当前别名配置
     */
    public void printAliasInfo() {
        try {
            GetAliasesRequest request = new GetAliasesRequest(indexAlias);
            GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);

            Map<String, Set<AliasMetadata>> aliases = response.getAliases();
            
            log.info("========== 别名 {} 当前配置 ==========", indexAlias);
            aliases.forEach((indexName, aliasMetadataSet) -> {
                aliasMetadataSet.forEach(aliasMetadata -> {
                    Boolean isWriteIndex = aliasMetadata.writeIndex();
                    log.info("索引: {}, 别名: {}, 写入索引: {}", 
                        indexName, aliasMetadata.alias(), 
                        isWriteIndex != null && isWriteIndex ? "✓" : "-");
                });
            });
            log.info("==========================================");

        } catch (IOException e) {
            log.error("查询别名信息时发生异常", e);
        }
    }

    /**
     * 设置索引为只读
     */
    private void setIndexReadOnly(String indexName) {
        try {
            org.elasticsearch.client.indices.PutSettingsRequest request = 
                new org.elasticsearch.client.indices.PutSettingsRequest(indexName);
            
            request.settings(Settings.builder()
                .put("index.blocks.write", true)
                .build());

            AcknowledgedResponse response = client.indices().putSettings(request, RequestOptions.DEFAULT);
            
            if (response.isAcknowledged()) {
                log.info("✅ 索引 {} 已设置为只读", indexName);
            }
        } catch (IOException e) {
            log.error("设置索引 {} 只读时发生异常", indexName, e);
        }
    }

    /**
     * 检查索引是否存在
     */
    private boolean indexExists(String indexName) {
        try {
            GetIndexRequest request = new GetIndexRequest(indexName);
            return client.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("检查索引 {} 是否存在时发生异常", indexName, e);
            return false;
        }
    }

    /**
     * 创建索引映射
     */
    private XContentBuilder createMapping() throws IOException {
        return XContentFactory.jsonBuilder()
            .startObject()
                .startObject("properties")
                    .startObject("id")
                        .field("type", "keyword")
                    .endObject()
                    .startObject("timestamp")
                        .field("type", "date")
                        .field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                    .endObject()
                    .startObject("content")
                        .field("type", "text")
                        .field("analyzer", "ik_max_word")
                        .field("search_analyzer", "ik_smart")
                    .endObject()
                    .startObject("status")
                        .field("type", "keyword")
                    .endObject()
                    .startObject("userId")
                        .field("type", "keyword")
                    .endObject()
                    .startObject("createTime")
                        .field("type", "date")
                    .endObject()
                .endObject()
            .endObject();
    }

    /**
     * 获取当前月份的索引名称
     */
    private String getCurrentMonthIndexName() {
        String yearMonth = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM"));
        return indexPrefix + "-" + yearMonth;
    }

    /**
     * 获取上个月的索引名称
     */
    private String getLastMonthIndexName() {
        String yearMonth = LocalDate.now().minusMonths(1).format(DateTimeFormatter.ofPattern("yyyy-MM"));
        return indexPrefix + "-" + yearMonth;
    }
}

4. 定时任务调度类

package com.example.es.scheduler;

import com.example.es.service.IndexManagementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Slf4j
@Component
public class IndexRotationScheduler {

    @Autowired
    private IndexManagementService indexManagementService;

    /**
     * 每月1号凌晨00:05执行索引切换
     * cron表达式:秒 分 时 日 月 周
     */
    @Scheduled(cron = "0 5 0 1 * ?")
    public void monthlyIndexRotation() {
        String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        log.info("========================================");
        log.info("⏰ 定时任务触发时间: {}", timestamp);
        log.info("========================================");

        try {
            // 执行月度索引切换
            boolean success = indexManagementService.performMonthlyRotation();

            if (success) {
                log.info("🎉 月度索引切换成功");
                // 打印当前别名配置
                indexManagementService.printAliasInfo();
                
                // 可选:发送成功通知
                sendSuccessNotification();
            } else {
                log.error("❌ 月度索引切换失败");
                // 发送告警通知
                sendAlertNotification("月度索引切换失败");
            }

        } catch (Exception e) {
            log.error("❌ 月度索引切换过程中发生异常", e);
            sendAlertNotification("月度索引切换异常: " + e.getMessage());
        }
    }

    /**
     * 每天凌晨检查索引状态(可选)
     */
    @Scheduled(cron = "0 0 1 * * ?")
    public void dailyIndexHealthCheck() {
        log.info("执行每日索引健康检查...");
        indexManagementService.printAliasInfo();
    }

    /**
     * 发送成功通知
     */
    private void sendSuccessNotification() {
        // TODO: 实现通知逻辑(邮件、钉钉、企业微信等)
        log.info("📧 发送成功通知");
    }

    /**
     * 发送告警通知
     */
    private void sendAlertNotification(String message) {
        // TODO: 实现告警逻辑
        log.error("🚨 发送告警通知: {}", message);
    }
}

5. 手动触发接口(便于测试)

package com.example.es.controller;

import com.example.es.service.IndexManagementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@RestController
@RequestMapping("/api/index")
public class IndexManagementController {

    @Autowired
    private IndexManagementService indexManagementService;

    /**
     * 手动触发月度索引切换
     */
    @PostMapping("/rotate")
    public Map<String, Object> manualRotation() {
        Map<String, Object> result = new HashMap<>();
        
        try {
            log.info("收到手动触发索引切换请求");
            boolean success = indexManagementService.performMonthlyRotation();
            
            result.put("success", success);
            result.put("message", success ? "索引切换成功" : "索引切换失败");
            
            return result;
        } catch (Exception e) {
            log.error("手动索引切换失败", e);
            result.put("success", false);
            result.put("message", "索引切换异常: " + e.getMessage());
            return result;
        }
    }

    /**
     * 创建月度索引(仅创建,不切换别名)
     */
    @PostMapping("/create")
    public Map<String, Object> createMonthlyIndex() {
        Map<String, Object> result = new HashMap<>();
        
        boolean success = indexManagementService.createMonthlyIndex();
        result.put("success", success);
        result.put("message", success ? "索引创建成功" : "索引创建失败");
        
        return result;
    }

    /**
     * 查看当前别名配置
     */
    @GetMapping("/alias/info")
    public Map<String, Object> getAliasInfo() {
        Map<String, Object> result = new HashMap<>();
        
        indexManagementService.printAliasInfo();
        result.put("success", true);
        result.put("message", "请查看日志输出");
        
        return result;
    }

    /**
     * 健康检查
     */
    @GetMapping("/health")
    public Map<String, Object> healthCheck() {
        Map<String, Object> result = new HashMap<>();
        result.put("status", "ok");
        result.put("timestamp", System.currentTimeMillis());
        return result;
    }
}

6. 配置文件

# application.yml
server:
  port: 8080

spring:
  application:
    name: elasticsearch-index-manager
  # 启用定时任务
  task:
    scheduling:
      enabled: true

# Elasticsearch配置
elasticsearch:
  host: localhost
  port: 9200
  scheme: http
  
  # 索引配置
  index:
    prefix: voice-data        # 索引前缀
    alias: voice-data         # 别名(业务使用的名称)
    shards: 5                 # 分片数
    replicas: 1               # 副本数

# 日志配置
logging:
  level:
    com.example.es: INFO
    org.elasticsearch: WARN
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

7. 启动类

package com.example.es;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class ElasticsearchIndexManagerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ElasticsearchIndexManagerApplication.class, args);
        System.out.println("======================================");
        System.out.println("  Elasticsearch 索引管理服务已启动");
        System.out.println("  定时任务:每月1号 00:05 自动切换索引");
        System.out.println("======================================");
    }
}

8. 测试类

package com.example.es;

import com.example.es.service.IndexManagementService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class IndexManagementTest {

    @Autowired
    private IndexManagementService indexManagementService;

    @Test
    public void testCreateMonthlyIndex() {
        boolean result = indexManagementService.createMonthlyIndex();
        System.out.println("创建索引结果: " + (result ? "成功" : "失败"));
    }

    @Test
    public void testUpdateAlias() {
        boolean result = indexManagementService.updateAliasForNewMonth();
        System.out.println("更新别名结果: " + (result ? "成功" : "失败"));
    }

    @Test
    public void testPerformRotation() {
        boolean result = indexManagementService.performMonthlyRotation();
        System.out.println("索引切换结果: " + (result ? "成功" : "失败"));
    }

    @Test
    public void testPrintAliasInfo() {
        indexManagementService.printAliasInfo();
    }
}

📊 运行效果演示

启动日志

2025-11-12 10:00:00 [main] INFO  c.e.es.ElasticsearchIndexManagerApplication - Starting...
2025-11-12 10:00:02 [main] INFO  o.e.c.RestHighLevelClient - Elasticsearch客户端初始化成功
======================================
  Elasticsearch 索引管理服务已启动
  定时任务:每月1号 00:05 自动切换索引
======================================

定时任务执行日志

========================================
⏰ 定时任务触发时间: 2025-12-01 00:05:00
========================================
========== 开始执行月度索引切换 ==========
✅ 索引 voice-data-2025-12 创建成功
移除上月索引 voice-data-2025-11 的写入标记
✅ 别名 voice-data 已更新,写入索引切换到 voice-data-2025-12
✅ 索引 voice-data-2025-11 已设置为只读
========== 月度索引切换完成 ==========
🎉 月度索引切换成功
========== 别名 voice-data 当前配置 ==========
索引: voice-data-history, 别名: voice-data, 写入索引: -
索引: voice-data-2025-11, 别名: voice-data, 写入索引: -
索引: voice-data-2025-12, 别名: voice-data, 写入索引: ✓
==========================================
📧 发送成功通知

手动触发测试

# 测试创建索引
curl -X POST http://localhost:8080/api/index/create

# 响应:
{
  "success": true,
  "message": "索引创建成功"
}

# 测试完整切换
curl -X POST http://localhost:8080/api/index/rotate

# 响应:
{
  "success": true,
  "message": "索引切换成功"
}

# 查看别名信息
curl http://localhost:8080/api/index/alias/info

# 响应:
{
  "success": true,
  "message": "请查看日志输出"
}

🔍 业务代码示例(无需修改)

数据写入

@Service
public class VoiceDataService {
    
    @Autowired
    private RestHighLevelClient client;
    
    // ✅ 业务代码完全不用改,仍然使用 voice-data
    public void saveVoiceData(VoiceData data) throws IOException {
        IndexRequest request = new IndexRequest("voice-data")  // 使用别名
            .id(data.getId())
            .source(XContentType.JSON,
                "timestamp", data.getTimestamp(),
                "content", data.getContent(),
                "status", data.getStatus());
        
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        
        // ES自动路由到当前写入索引(voice-data-2025-12)
    }
}

数据查询

@Service
public class VoiceSearchService {
    
    @Autowired
    private RestHighLevelClient client;
    
    // ✅ 查询代码也不用改
    public List<VoiceData> searchVoiceData(String keyword) throws IOException {
        SearchRequest request = new SearchRequest("voice-data");  // 使用别名
        
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.query(QueryBuilders.matchQuery("content", keyword));
        builder.size(100);
        builder.sort("timestamp", SortOrder.DESC);
        
        request.source(builder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        
        // ES自动查询所有关联索引:
        // - voice-data-history
        // - voice-data-2025-11
        // - voice-data-2025-12
        
        // 解析结果...
        return parseSearchHits(response.getHits());
    }
}

聚合统计

@Service
public class VoiceStatisticsService {
    
    @Autowired
    private RestHighLevelClient client;
    
    // ✅ 聚合统计也无需修改
    public Map<String, Long> getMonthlyStatistics() throws IOException {
        SearchRequest request = new SearchRequest("voice-data");  // 使用别名
        
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.size(0);
        builder.aggregation(
            AggregationBuilders.dateHistogram("monthly")
                .field("timestamp")
                .calendarInterval(DateHistogramInterval.MONTH)
        );
        
        request.source(builder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        
        // ES自动在所有索引上执行聚合并合并结果
        // 解析聚合结果...
        return parseAggregation(response.getAggregations());
    }
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容