以下是企业级封装的完整代码,包括Spring Boot容器管理、配置化、监控、日志等最佳实践:
1. 配置类
@Configuration
@EnableConfigurationProperties(BatchProcessingProperties.class)
public class BatchProcessingConfig {
@Bean("batchProcessorScheduler")
public Scheduler batchProcessorScheduler(BatchProcessingProperties properties) {
BatchProcessingProperties.SchedulerConfig config = properties.getScheduler();
return Schedulers.newBoundedElastic(
config.getCorePoolSize(),
config.getQueueCapacity(),
"batch-processor",
config.getKeepAlive().toSeconds(),
true
);
}
@Bean("batchComputationScheduler")
public Scheduler batchComputationScheduler(BatchProcessingProperties properties) {
int parallelism = properties.getComputation().getParallelism();
return Schedulers.newParallel("batch-computation", parallelism);
}
@Bean
public Scheduler batchIoScheduler() {
return Schedulers.boundedElastic();
}
@PreDestroy
public void cleanupSchedulers() {
Schedulers.shutdownNow();
}
}
2. 配置属性类
@ConfigurationProperties(prefix = "app.batch-processing")
@Data
public class BatchProcessingProperties {
private SchedulerConfig scheduler = new SchedulerConfig();
private ComputationConfig computation = new ComputationConfig();
private TimeoutConfig timeout = new TimeoutConfig();
private RetryConfig retry = new RetryConfig();
@Data
public static class SchedulerConfig {
private int corePoolSize = 10;
private int maxPoolSize = 20;
private int queueCapacity = 100;
private Duration keepAlive = Duration.ofMinutes(1);
}
@Data
public static class ComputationConfig {
private int parallelism = Runtime.getRuntime().availableProcessors();
}
@Data
public static class TimeoutConfig {
private Duration processing = Duration.ofSeconds(30);
private Duration batch = Duration.ofMinutes(5);
}
@Data
public static class RetryConfig {
private int maxAttempts = 3;
private Duration backoffInitial = Duration.ofMillis(100);
private double backoffMultiplier = 2.0;
}
}
3. 核心服务接口
public interface BatchProcessingService<T, R> {
/**
* 批量处理数据
* @param items 待处理的数据项列表
* @return 处理结果
*/
Mono<BatchResult<R>> processBatch(List<T> items);
/**
* 异步批量处理数据
* @param items 待处理的数据项列表
* @return 处理结果的Mono
*/
Mono<BatchResult<R>> processBatchAsync(List<T> items);
}
4. 核心服务实现
@Slf4j
@Service
@Transactional
public class ReactiveBatchProcessingServiceImpl<T, R> implements BatchProcessingService<T, R> {
private final Scheduler processorScheduler;
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final BatchProcessingProperties properties;
private final MeterRegistry meterRegistry;
private final Function<T, Mono<R>> itemProcessor;
private final Function<List<R>, BatchResult<R>> resultCombiner;
private final Predicate<R> successPredicate;
public ReactiveBatchProcessingServiceImpl(
@Qualifier("batchProcessorScheduler") Scheduler processorScheduler,
@Qualifier("batchComputationScheduler") Scheduler computationScheduler,
@Qualifier("batchIoScheduler") Scheduler ioScheduler,
BatchProcessingProperties properties,
MeterRegistry meterRegistry,
Function<T, Mono<R>> itemProcessor,
Function<List<R>, BatchResult<R>> resultCombiner,
Predicate<R> successPredicate) {
this.processorScheduler = processorScheduler;
this.computationScheduler = computationScheduler;
this.ioScheduler = ioScheduler;
this.properties = properties;
this.meterRegistry = meterRegistry;
this.itemProcessor = itemProcessor;
this.resultCombiner = resultCombiner;
this.successPredicate = successPredicate;
}
@Override
public Mono<BatchResult<R>> processBatch(List<T> items) {
long startTime = System.currentTimeMillis();
String batchId = UUID.randomUUID().toString();
log.info("Starting batch processing - batchId: {}, itemCount: {}", batchId, items.size());
return Flux.fromIterable(items)
.name("batch-items")
.tag("batch.id", batchId)
.metrics()
.parallel()
.runOn(computationScheduler)
.flatMap(item -> processItemWithRetry(item, batchId)
.timeout(properties.getTimeout().getProcessing())
.onErrorResume(throwable -> {
log.warn("Item processing failed - batchId: {}, error: {}",
batchId, throwable.getMessage());
return Mono.error(new BatchItemProcessingException(
"Failed to process item", throwable));
}))
.sequential()
.collectList()
.map(results -> validateAndCombineResults(results, batchId))
.doOnSuccess(result -> {
long duration = System.currentTimeMillis() - startTime;
log.info("Batch processing completed successfully - batchId: {}, duration: {}ms, successRate: {}%",
batchId, duration, calculateSuccessRate(result));
recordMetrics("success", items.size(), duration);
})
.doOnError(throwable -> {
long duration = System.currentTimeMillis() - startTime;
log.error("Batch processing failed - batchId: {}, duration: {}ms",
batchId, duration, throwable);
recordMetrics("failure", items.size(), duration);
})
.subscribeOn(processorScheduler)
.timeout(properties.getTimeout().getBatch());
}
@Override
public Mono<BatchResult<R>> processBatchAsync(List<T> items) {
return processBatch(items)
.subscribeOn(processorScheduler);
}
private Mono<R> processItemWithRetry(T item, String batchId) {
return itemProcessor.apply(item)
.retryWhen(Retry.backoff(
properties.getRetry().getMaxAttempts() - 1,
properties.getRetry().getBackoffInitial())
.maxBackoff(Duration.ofSeconds(5))
.filter(throwable -> !(throwable instanceof BatchItemProcessingException))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new BatchProcessingException(
"Retry exhausted for item after " +
properties.getRetry().getMaxAttempts() + " attempts",
retrySignal.failure())))
.doOnSuccess(result -> log.debug("Item processed successfully - batchId: {}", batchId))
.doOnError(error -> log.warn("Item processing error - batchId: {}, error: {}",
batchId, error.getMessage()));
}
private BatchResult<R> validateAndCombineResults(List<R> results, String batchId) {
long successCount = results.stream()
.filter(successPredicate)
.count();
if (successCount < results.size()) {
log.warn("Some items failed in batch - batchId: {}, successCount: {}, totalCount: {}",
batchId, successCount, results.size());
throw new BatchProcessingException(
String.format("Batch processing partially failed - success: %d/%d",
successCount, results.size()));
}
return resultCombiner.apply(results);
}
private double calculateSuccessRate(BatchResult<R> result) {
if (result.getTotalCount() == 0) return 100.0;
return (double) result.getSuccessCount() / result.getTotalCount() * 100;
}
private void recordMetrics(String status, int itemCount, long duration) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(Timer.builder("batch.processing.duration")
.tag("status", status)
.register(meterRegistry));
Counter.builder("batch.processing.items")
.tag("status", status)
.register(meterRegistry)
.increment(itemCount);
}
}
5. 异常定义
public class BatchProcessingException extends RuntimeException {
public BatchProcessingException(String message) {
super(message);
}
public BatchProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
public class BatchItemProcessingException extends BatchProcessingException {
public BatchItemProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
6. 数据模型
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BatchResult<T> {
private long successCount;
private long totalCount;
private List<T> results;
private LocalDateTime processedAt;
public BatchResult(long successCount, long totalCount, List<T> results) {
this.successCount = successCount;
this.totalCount = totalCount;
this.results = results;
this.processedAt = LocalDateTime.now();
}
public double getSuccessRate() {
return totalCount == 0 ? 0 : (double) successCount / totalCount * 100;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BatchItem<T> {
private String id;
private T data;
private Map<String, Object> metadata;
public BatchItem(T data) {
this.id = UUID.randomUUID().toString();
this.data = data;
this.metadata = new HashMap<>();
}
}
7. 具体业务实现示例
@Service
public class UserBatchProcessingService {
private final ReactiveBatchProcessingServiceImpl<BatchItem<User>, ProcessResult> batchService;
public UserBatchProcessingService(
@Qualifier("batchProcessorScheduler") Scheduler processorScheduler,
@Qualifier("batchComputationScheduler") Scheduler computationScheduler,
@Qualifier("batchIoScheduler") Scheduler ioScheduler,
BatchProcessingProperties properties,
MeterRegistry meterRegistry,
UserRepository userRepository) {
this.batchService = new ReactiveBatchProcessingServiceImpl<>(
processorScheduler,
computationScheduler,
ioScheduler,
properties,
meterRegistry,
item -> processUserItem(item, userRepository),
this::combineUserResults,
ProcessResult::isSuccess
);
}
public Mono<BatchResult<ProcessResult>> processUsers(List<User> users) {
List<BatchItem<User>> batchItems = users.stream()
.map(BatchItem::new)
.collect(Collectors.toList());
return batchService.processBatch(batchItems);
}
private Mono<ProcessResult> processUserItem(BatchItem<User> item, UserRepository userRepository) {
return Mono.fromCallable(() -> {
try {
User savedUser = userRepository.save(item.getData());
return ProcessResult.success(savedUser.getId(), "User saved successfully");
} catch (Exception e) {
return ProcessResult.failed("Failed to save user: " + e.getMessage());
}
})
.subscribeOn(Schedulers.boundedElastic());
}
private BatchResult<ProcessResult> combineUserResults(List<ProcessResult> results) {
long successCount = results.stream().filter(ProcessResult::isSuccess).count();
return new BatchResult<>(successCount, results.size(), results);
}
}
8. 配置文件示例
app:
batch-processing:
scheduler:
core-pool-size: 10
queue-capacity: 100
keep-alive: 60s
computation:
parallelism: 8
timeout:
processing: 30s
batch: 5m
retry:
max-attempts: 3
backoff-initial: 100ms
backoff-multiplier: 2.0
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
metrics:
export:
prometheus:
enabled: true
9. 控制器层
@RestController
@RequestMapping("/api/batch")
@Slf4j
public class BatchProcessingController {
private final UserBatchProcessingService userBatchService;
public BatchProcessingController(UserBatchProcessingService userBatchService) {
this.userBatchService = userBatchService;
}
@PostMapping("/users")
public Mono<ResponseEntity<BatchResult<ProcessResult>>> processUsers(
@RequestBody List<User> users) {
return userBatchService.processUsers(users)
.map(ResponseEntity::ok)
.onErrorResume(throwable -> {
log.error("Batch processing error", throwable);
return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build());
});
}
@PostMapping("/users/async")
public Mono<ResponseEntity<Mono<BatchResult<ProcessResult>>>> processUsersAsync(
@RequestBody List<User> users) {
Mono<BatchResult<ProcessResult>> result = userBatchService.processUsers(users);
return Mono.just(ResponseEntity.accepted().body(result));
}
}
主要企业级特性
- 配置化管理:所有参数通过配置文件管理
- Spring容器管理:Scheduler由Spring容器创建和销毁
- 监控集成:集成Micrometer进行指标监控
- 日志记录:完整的操作日志和错误日志
- 异常处理:统一的异常处理机制
- 重试机制:支持配置化的重试策略
- 超时控制:防止任务长时间阻塞
- 泛型支持:可适用于各种业务场景
- 事务支持:确保数据一致性
- 资源管理:自动化的资源清理
这种实现方式完全符合企业级应用的标准,具有良好的可维护性、可配置性和可监控性。