批量处理业务并支持事务

以下是企业级封装的完整代码,包括Spring Boot容器管理、配置化、监控、日志等最佳实践:

1. 配置类

@Configuration
@EnableConfigurationProperties(BatchProcessingProperties.class)
public class BatchProcessingConfig {
    
    @Bean("batchProcessorScheduler")
    public Scheduler batchProcessorScheduler(BatchProcessingProperties properties) {
        BatchProcessingProperties.SchedulerConfig config = properties.getScheduler();
        return Schedulers.newBoundedElastic(
            config.getCorePoolSize(),
            config.getQueueCapacity(),
            "batch-processor",
            config.getKeepAlive().toSeconds(),
            true
        );
    }
    
    @Bean("batchComputationScheduler")
    public Scheduler batchComputationScheduler(BatchProcessingProperties properties) {
        int parallelism = properties.getComputation().getParallelism();
        return Schedulers.newParallel("batch-computation", parallelism);
    }
    
    @Bean
    public Scheduler batchIoScheduler() {
        return Schedulers.boundedElastic();
    }
    
    @PreDestroy
    public void cleanupSchedulers() {
        Schedulers.shutdownNow();
    }
}

2. 配置属性类

@ConfigurationProperties(prefix = "app.batch-processing")
@Data
public class BatchProcessingProperties {
    
    private SchedulerConfig scheduler = new SchedulerConfig();
    private ComputationConfig computation = new ComputationConfig();
    private TimeoutConfig timeout = new TimeoutConfig();
    private RetryConfig retry = new RetryConfig();
    
    @Data
    public static class SchedulerConfig {
        private int corePoolSize = 10;
        private int maxPoolSize = 20;
        private int queueCapacity = 100;
        private Duration keepAlive = Duration.ofMinutes(1);
    }
    
    @Data
    public static class ComputationConfig {
        private int parallelism = Runtime.getRuntime().availableProcessors();
    }
    
    @Data
    public static class TimeoutConfig {
        private Duration processing = Duration.ofSeconds(30);
        private Duration batch = Duration.ofMinutes(5);
    }
    
    @Data
    public static class RetryConfig {
        private int maxAttempts = 3;
        private Duration backoffInitial = Duration.ofMillis(100);
        private double backoffMultiplier = 2.0;
    }
}

3. 核心服务接口

public interface BatchProcessingService<T, R> {
    
    /**
     * 批量处理数据
     * @param items 待处理的数据项列表
     * @return 处理结果
     */
    Mono<BatchResult<R>> processBatch(List<T> items);
    
    /**
     * 异步批量处理数据
     * @param items 待处理的数据项列表
     * @return 处理结果的Mono
     */
    Mono<BatchResult<R>> processBatchAsync(List<T> items);
}

4. 核心服务实现

@Slf4j
@Service
@Transactional
public class ReactiveBatchProcessingServiceImpl<T, R> implements BatchProcessingService<T, R> {
    
    private final Scheduler processorScheduler;
    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final BatchProcessingProperties properties;
    private final MeterRegistry meterRegistry;
    
    private final Function<T, Mono<R>> itemProcessor;
    private final Function<List<R>, BatchResult<R>> resultCombiner;
    private final Predicate<R> successPredicate;
    
    public ReactiveBatchProcessingServiceImpl(
            @Qualifier("batchProcessorScheduler") Scheduler processorScheduler,
            @Qualifier("batchComputationScheduler") Scheduler computationScheduler,
            @Qualifier("batchIoScheduler") Scheduler ioScheduler,
            BatchProcessingProperties properties,
            MeterRegistry meterRegistry,
            Function<T, Mono<R>> itemProcessor,
            Function<List<R>, BatchResult<R>> resultCombiner,
            Predicate<R> successPredicate) {
        
        this.processorScheduler = processorScheduler;
        this.computationScheduler = computationScheduler;
        this.ioScheduler = ioScheduler;
        this.properties = properties;
        this.meterRegistry = meterRegistry;
        this.itemProcessor = itemProcessor;
        this.resultCombiner = resultCombiner;
        this.successPredicate = successPredicate;
    }
    
    @Override
    public Mono<BatchResult<R>> processBatch(List<T> items) {
        long startTime = System.currentTimeMillis();
        String batchId = UUID.randomUUID().toString();
        
        log.info("Starting batch processing - batchId: {}, itemCount: {}", batchId, items.size());
        
        return Flux.fromIterable(items)
                .name("batch-items")
                .tag("batch.id", batchId)
                .metrics()
                .parallel()
                .runOn(computationScheduler)
                .flatMap(item -> processItemWithRetry(item, batchId)
                        .timeout(properties.getTimeout().getProcessing())
                        .onErrorResume(throwable -> {
                            log.warn("Item processing failed - batchId: {}, error: {}", 
                                    batchId, throwable.getMessage());
                            return Mono.error(new BatchItemProcessingException(
                                    "Failed to process item", throwable));
                        }))
                .sequential()
                .collectList()
                .map(results -> validateAndCombineResults(results, batchId))
                .doOnSuccess(result -> {
                    long duration = System.currentTimeMillis() - startTime;
                    log.info("Batch processing completed successfully - batchId: {}, duration: {}ms, successRate: {}%", 
                            batchId, duration, calculateSuccessRate(result));
                    recordMetrics("success", items.size(), duration);
                })
                .doOnError(throwable -> {
                    long duration = System.currentTimeMillis() - startTime;
                    log.error("Batch processing failed - batchId: {}, duration: {}ms", 
                            batchId, duration, throwable);
                    recordMetrics("failure", items.size(), duration);
                })
                .subscribeOn(processorScheduler)
                .timeout(properties.getTimeout().getBatch());
    }
    
    @Override
    public Mono<BatchResult<R>> processBatchAsync(List<T> items) {
        return processBatch(items)
                .subscribeOn(processorScheduler);
    }
    
    private Mono<R> processItemWithRetry(T item, String batchId) {
        return itemProcessor.apply(item)
                .retryWhen(Retry.backoff(
                        properties.getRetry().getMaxAttempts() - 1,
                        properties.getRetry().getBackoffInitial())
                        .maxBackoff(Duration.ofSeconds(5))
                        .filter(throwable -> !(throwable instanceof BatchItemProcessingException))
                        .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> 
                                new BatchProcessingException(
                                        "Retry exhausted for item after " + 
                                        properties.getRetry().getMaxAttempts() + " attempts", 
                                        retrySignal.failure())))
                .doOnSuccess(result -> log.debug("Item processed successfully - batchId: {}", batchId))
                .doOnError(error -> log.warn("Item processing error - batchId: {}, error: {}", 
                        batchId, error.getMessage()));
    }
    
    private BatchResult<R> validateAndCombineResults(List<R> results, String batchId) {
        long successCount = results.stream()
                .filter(successPredicate)
                .count();
        
        if (successCount < results.size()) {
            log.warn("Some items failed in batch - batchId: {}, successCount: {}, totalCount: {}", 
                    batchId, successCount, results.size());
            throw new BatchProcessingException(
                    String.format("Batch processing partially failed - success: %d/%d", 
                            successCount, results.size()));
        }
        
        return resultCombiner.apply(results);
    }
    
    private double calculateSuccessRate(BatchResult<R> result) {
        if (result.getTotalCount() == 0) return 100.0;
        return (double) result.getSuccessCount() / result.getTotalCount() * 100;
    }
    
    private void recordMetrics(String status, int itemCount, long duration) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(Timer.builder("batch.processing.duration")
                .tag("status", status)
                .register(meterRegistry));
        
        Counter.builder("batch.processing.items")
                .tag("status", status)
                .register(meterRegistry)
                .increment(itemCount);
    }
}

5. 异常定义

public class BatchProcessingException extends RuntimeException {
    public BatchProcessingException(String message) {
        super(message);
    }
    
    public BatchProcessingException(String message, Throwable cause) {
        super(message, cause);
    }
}

public class BatchItemProcessingException extends BatchProcessingException {
    public BatchItemProcessingException(String message, Throwable cause) {
        super(message, cause);
    }
}

6. 数据模型

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BatchResult<T> {
    private long successCount;
    private long totalCount;
    private List<T> results;
    private LocalDateTime processedAt;
    
    public BatchResult(long successCount, long totalCount, List<T> results) {
        this.successCount = successCount;
        this.totalCount = totalCount;
        this.results = results;
        this.processedAt = LocalDateTime.now();
    }
    
    public double getSuccessRate() {
        return totalCount == 0 ? 0 : (double) successCount / totalCount * 100;
    }
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BatchItem<T> {
    private String id;
    private T data;
    private Map<String, Object> metadata;
    
    public BatchItem(T data) {
        this.id = UUID.randomUUID().toString();
        this.data = data;
        this.metadata = new HashMap<>();
    }
}

7. 具体业务实现示例

@Service
public class UserBatchProcessingService {
    
    private final ReactiveBatchProcessingServiceImpl<BatchItem<User>, ProcessResult> batchService;
    
    public UserBatchProcessingService(
            @Qualifier("batchProcessorScheduler") Scheduler processorScheduler,
            @Qualifier("batchComputationScheduler") Scheduler computationScheduler,
            @Qualifier("batchIoScheduler") Scheduler ioScheduler,
            BatchProcessingProperties properties,
            MeterRegistry meterRegistry,
            UserRepository userRepository) {
        
        this.batchService = new ReactiveBatchProcessingServiceImpl<>(
                processorScheduler,
                computationScheduler,
                ioScheduler,
                properties,
                meterRegistry,
                item -> processUserItem(item, userRepository),
                this::combineUserResults,
                ProcessResult::isSuccess
        );
    }
    
    public Mono<BatchResult<ProcessResult>> processUsers(List<User> users) {
        List<BatchItem<User>> batchItems = users.stream()
                .map(BatchItem::new)
                .collect(Collectors.toList());
        return batchService.processBatch(batchItems);
    }
    
    private Mono<ProcessResult> processUserItem(BatchItem<User> item, UserRepository userRepository) {
        return Mono.fromCallable(() -> {
            try {
                User savedUser = userRepository.save(item.getData());
                return ProcessResult.success(savedUser.getId(), "User saved successfully");
            } catch (Exception e) {
                return ProcessResult.failed("Failed to save user: " + e.getMessage());
            }
        })
        .subscribeOn(Schedulers.boundedElastic());
    }
    
    private BatchResult<ProcessResult> combineUserResults(List<ProcessResult> results) {
        long successCount = results.stream().filter(ProcessResult::isSuccess).count();
        return new BatchResult<>(successCount, results.size(), results);
    }
}

8. 配置文件示例

app:
  batch-processing:
    scheduler:
      core-pool-size: 10
      queue-capacity: 100
      keep-alive: 60s
    computation:
      parallelism: 8
    timeout:
      processing: 30s
      batch: 5m
    retry:
      max-attempts: 3
      backoff-initial: 100ms
      backoff-multiplier: 2.0

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true

9. 控制器层

@RestController
@RequestMapping("/api/batch")
@Slf4j
public class BatchProcessingController {
    
    private final UserBatchProcessingService userBatchService;
    
    public BatchProcessingController(UserBatchProcessingService userBatchService) {
        this.userBatchService = userBatchService;
    }
    
    @PostMapping("/users")
    public Mono<ResponseEntity<BatchResult<ProcessResult>>> processUsers(
            @RequestBody List<User> users) {
        
        return userBatchService.processUsers(users)
                .map(ResponseEntity::ok)
                .onErrorResume(throwable -> {
                    log.error("Batch processing error", throwable);
                    return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
                });
    }
    
    @PostMapping("/users/async")
    public Mono<ResponseEntity<Mono<BatchResult<ProcessResult>>>> processUsersAsync(
            @RequestBody List<User> users) {
        
        Mono<BatchResult<ProcessResult>> result = userBatchService.processUsers(users);
        return Mono.just(ResponseEntity.accepted().body(result));
    }
}

主要企业级特性

  1. 配置化管理:所有参数通过配置文件管理
  2. Spring容器管理:Scheduler由Spring容器创建和销毁
  3. 监控集成:集成Micrometer进行指标监控
  4. 日志记录:完整的操作日志和错误日志
  5. 异常处理:统一的异常处理机制
  6. 重试机制:支持配置化的重试策略
  7. 超时控制:防止任务长时间阻塞
  8. 泛型支持:可适用于各种业务场景
  9. 事务支持:确保数据一致性
  10. 资源管理:自动化的资源清理

这种实现方式完全符合企业级应用的标准,具有良好的可维护性、可配置性和可监控性。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容