1.pom.xml
<dependency>
<groupId>org.apache.kafka
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka
<artifactId>spring-kafka
<version>1.2.3.RELEASE</version>
</dependency>
2.Application.yml
spring:
kafka:
bootstrap-servers: 10.254.8.29:9092
listener:
concurrency: 3
consumer:
group-id: GP_DATAMALL_DEFAULT_CONSUMER
key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
auto-offset-reset: earliest
3.定义kafka消费基础类
@Configuration
public class BaseKafkaComsumerService {
protected final static Loggerlogger = LoggerFactory.getLogger(BaseKafkaComsumerService.class);
/**
* 消费组的开头
*/
private static final String GROUP_START ="GP-";
/**
* 话题前缀
*/
protected static final String TOPIC_START ="TP-";
/**
* 最大消费量
*/
@Value("${dataservice.consumer.consume.max:1}")
private IntegerCONSUME_MAX =1;
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
private StringkafkaServers;
@Value("${dataservice.consumer.poll.timeout.ms:2000}")
private LongpollTimeout;
@Value("${dataservice.consumer.auto.commit.interval.ms:1000}")
private StringcommitInterval;
@Value("${dataservice.consumer.session.timeout.ms:15000}")
private StringsessionTimeout;
@Value("${dataservice.consumer.max.partition.fetch.bytes:1048576}")
private StringmaxPartitionFetchBytes;
@Value("${dataservice.consumer.request.timeout:16000}")
private IntegerrequestTimeout;
@Autowired
private AsyncServiceasyncService;
@Autowired
protected ClientOffsetServiceclientOffsetService;
/**
* 获取客户端的消费组
* @param clientId
* @return
*/
protected StringgetConsumerGroup(String clientId){
return new StringBuilder().append(GROUP_START).append(clientId).toString();
}
/**
* 消费最新主题记录
* @param groupId
* @param topic
* @return
*/
protected KafKaResult consumeHead(KafkaConsumer consumer, String groupId, String topic)throws Exception{
TopicPartition tp =new TopicPartition(topic, 0);
/*返回值*/
ConsumeDataResult result =new ConsumeDataResult();
List baseResult =new ArrayList();
/*最后偏移量*/
long lastOffset = -1L;
try {
consumer.assign(Arrays.asList(tp));
Long maxOffset =this.getCurrentMaxOffset(consumer, tp);
//设置偏移量为最后一个的前移一位
setConsumerOffset(consumer, tp, maxOffset-1);
ConsumerRecords records = consumer.poll(pollTimeout);
for (ConsumerRecord record : records){
baseResult.add(this.bulidConsumeBase(record.value(), record.offset()));
lastOffset = record.offset();
consumer.commitAsync();
if (baseResult.size() >=CONSUME_MAX ){
break;
}
}
}catch (Exception e) {
logger.error("获取topic数据失败",e);
return KafKaResult.fail("10000001", "获取topic数据失败");
}
if (lastOffset >=0) {
clientOffsetService.saveOffset(groupId,topic,lastOffset);
}
result.setOffset(lastOffset);
result.setResult(baseResult);
return KafKaResult.success(result);
}
/**
* 从上次记录的开始消费
* @param groupId
* @param topic
* @return
*/
protected KafKaResultconsumeRemember(KafkaConsumer consumer,String groupId, String topic)throws Exception {
Long offset =clientOffsetService.queryOffset(groupId,topic,null);//查询客户端offset,根据实际项目中存储规则自己实现
logger.info("type:2 redis offset get.... groupId:{} topic:{} offset:{}",groupId,topic,offset);
if(offset ==null) {//防止起始位置小于offset值,接口永远拿不到数据
offset =0L;
}else {
offset = offset+1;
}
//排除超时异常
KafKaResult result = consumeOffset(consumer,groupId, topic,offset,null);
if(!result.isSuccess() && result.getCode().equals("10000012")) {
ConsumeDataResult resultData =new ConsumeDataResult();
resultData.setOffset(-1L);
resultData.setResult(null);
return KafKaResult.success(resultData);
}
return result;
}
/**
* 指定偏移量去消费主题记录
* @param groupId
* @param topic
* @param offset
* @return
*/
protected KafKaResult consumeOffset(KafkaConsumer consumer,String groupId, String topic,Long offset,Long endOffset)throws Exception{
ConsumeDataResult result =new ConsumeDataResult();
List baseResult =new ArrayList();
/*最后偏移量*/
long lastOffset = -1L;
TopicPartition tp =new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(tp));
//校验最小
Long beginOffset = getCurrentBeginOffset(consumer, tp);
if(offset !=null
&& offset !=0L && beginOffset > offset) {
ConsumeDataResult errorResult =new ConsumeDataResult();
errorResult.setOffset(beginOffset);
logger.error("输入offset小于实际topic中开始offset值. 输入Offset:{} topic实际Offset:{}",offset,beginOffset);
return KafKaResult.fail("10000011", "输入offset小于实际topic中开始offset值."+" 输入Offset:"+offset+" topic实际Offset:"+beginOffset,errorResult);
}
//校验最大
Long maxOffset =this.getCurrentMaxOffset(consumer, tp);
if(offset >= maxOffset) {
ConsumeDataResult errorResult =new ConsumeDataResult();
errorResult.setOffset(maxOffset-1);
logger.error("输入offset大于实际topic中开始offset值. 输入Offset:{} topic实际Offset:{}",offset,maxOffset-1);
return KafKaResult.fail("10000012", "输入offset大于实际topic中开始offset值. 输入Offset:"+offset+" topic实际Offset:"+(maxOffset-1),errorResult);
}
ConsumerRecords records =null;
long lastOffsetNum = -1L;
if (0 == offset) {
consumer.seekToBeginning(Arrays.asList(tp));
}else {
//设置偏移量为最后一个的前移一位
setConsumerOffset(consumer, tp, offset);
}
logger.info("groupId:{} --topic:{} --offset:{} --endOffset:{} --maxOffset:{}",groupId,topic,offset,endOffset,maxOffset);
if(endOffset ==null) {
endOffset = maxOffset;
}
try {
while (true) {
records = consumer.poll(pollTimeout);
if (records.isEmpty()){
break;
}
for (ConsumerRecord record : records){
lastOffsetNum = record.offset();
if (baseResult.size() >=CONSUME_MAX || lastOffsetNum >= maxOffset || lastOffsetNum > endOffset){
break;
}
baseResult.add(bulidConsumeBase(record.value(), record.offset()));
lastOffset = record.offset();
consumer.commitAsync();
}
if (baseResult.size() >=CONSUME_MAX || lastOffsetNum >= maxOffset || lastOffsetNum >= endOffset){
break;
}
}
}catch (Exception e) {
logger.error("获取topic数据错误",e);
return KafKaResult.fail("10000001", "获取topic数据错误");
}
result.setOffset(lastOffset);
result.setResult(baseResult);
if (lastOffset >=0) {
clientOffsetService.saveOffset(groupId,topic,lastOffset);
}
return KafKaResult.success(result);
}
/**
*
* @Description: 组装数据
* @param @param data 原始数据
* @param @param offset 偏移量
* @return ConsumeDataBaseResult
* @throws
*/
public ConsumeDataBaseResult bulidConsumeBase(String data,Long offset) {
ConsumeDataBaseResult result =new ConsumeDataBaseResult();
result.setData(JSONObject.parseObject(data));
result.setOffset(offset);
return result;
}
/**
* 设置话题的偏移量
*/
private void setConsumerOffset(KafkaConsumer consumer, TopicPartition partition, Long offset){
if (offset <0) {
consumer.seek(partition, 0 );
}else {
consumer.seek(partition, offset );
}
}
/**
* 获取当前消费的分区最大偏移量
* @param consumer
* @param tp
* @return
*/
private LonggetCurrentMaxOffset(KafkaConsumer consumer, TopicPartition tp){
Map map = consumer.endOffsets(Arrays.asList(tp));
if (CollectionUtils.isEmpty(map)) {
return 0L;
}
Long maxOffset = map.get(tp);
return maxOffset;
}
/**
* 获取当前消费的分区起始偏移量
* @param consumer
* @param tp
* @return
*/
private LonggetCurrentBeginOffset(KafkaConsumer consumer, TopicPartition tp){
Map map = consumer.beginningOffsets(Arrays.asList(tp));
if (CollectionUtils.isEmpty(map)) {
return 0L;
}
Long beginningOffset = map.get(tp);
return beginningOffset;
}
/**
* 设置一些消费信息
*/
protected void transConsumeInfo(ConsumeDataParam param){
param.setGroup(getConsumerGroup(param.getClientId()));
param.setTopic(TOPIC_START+param.getTopic());
}
/**
* kafka消费属性
* @param group : 所属消费组
* @return
*/
private MapconsumerProps(String group) {
return consumerProps(group,true);
}
/**
* kafka消费属性
* @param group : 所属消费组
* @param autoCommit : 是否自动提交
* @return
*/
private Map consumerProps(String group, boolean autoCommit) {
Map props =new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,CONSUME_MAX);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
return props;
}
/**
*
* @Description: 创建consumer
* @param @param groupId
* @param @return
* @return KafkaConsumer
* @throws
*/
protected KafkaConsumer createConsumer(String groupId){
Map props =this.consumerProps(groupId,false);
KafkaConsumer consumer =new KafkaConsumer<>(props);
return consumer;
}
}
4.kafka消费服务实现类.
@Service
public class KafkaComsumerServiceImpl extends BaseKafkaComsumerService implements KafkaComsumerService {
@Autowired
private AsyncServiceasyncService;
/**
* 消费
*
* @param consumeDataParam
* @return
*/
public KafKaResult consumeData(ConsumeDataParam consumeDataParam)throws ServiceException {
super.transConsumeInfo(consumeDataParam);
Integer type = consumeDataParam.getType();
//1.生成consumer
KafkaConsumer consumer =super.createConsumer(consumeDataParam.getGroup());
//2.处理数据
try {
switch (type) {
case 0:
return super.consumeOffset(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic(), 0L, null);
case 1:
return super.consumeHead(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic());
case 3:
return super.consumeOffset(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic(), consumeDataParam.getOffset(), consumeDataParam.getEndOffset());
default:// 2:
return super.consumeRemember(consumer, consumeDataParam.getGroup(), consumeDataParam.getTopic());
}
}catch (Exception e) {
throw new ServiceException(e);
}finally {
this.asyncService.colse(consumer);
}
}
@Override
public KafKaResult consumeOffset(ConsumerOffsetParam param)throws ServiceException {
super.clientOffsetService.saveOffset(super.getConsumerGroup(param.getClientId()), TOPIC_START + param.getTopic(), param.getOffset());
return KafKaResult.success("");
}
}
异步类:调用kafka消费段关闭
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @ClassName: AsyncService
* @Description: 异步方法调用)
*/
@Component
public class AsyncService {
@Async
public void colse(KafkaConsumer consumer) {
consumer.close();
}
}