在做这个例子之前,首先要安装zookeeper和kafka。然后按照下面的步骤去做。这个例子还演示了消费者失败重试怎么实现。
一,使用SPRING INITIALIZR https://start.spring.io/ 添加kafka依赖,生成项目,pom.xml如下
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
二,在application.yml中配置kafka
spring:
kafka:
consumer:
bootstrap-servers: 127.0.0.1:9092
enable-auto-commit: false
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.vcredit.kafkademo.dto
group-id: demo
producer:
bootstrap-servers: 127.0.0.1:9092
retries: 0
batch-size: 4096
buffer-memory: 40960
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
listener:
ack-mode: manual_immediate
concurrency: 2
三,写listener
@Slf4j
@Component
@KafkaListener(id = "multi", topics = "myTopic")
public class Listener {
@KafkaHandler
public void listen(String foo, Acknowledgment acknowledgment) {
log.info("String:" + foo);
acknowledgment.acknowledge();
}
@KafkaHandler
@Retryable(value = Exception.class, maxAttempts = MAX_VALUE, backoff = @Backoff(delay = 2000, multiplier = 1.5))
public void listen(Integer bar, Acknowledgment acknowledgment) throws Exception {
log.info("Integer:" + bar);
if (Math.random() > 0.5) {
log.error("ERROR INTEGER!");
throw new Exception("Integer err");
} else {
acknowledgment.acknowledge();
}
}
@KafkaHandler(isDefault = true)
public void listenDefault(Foo foo, Acknowledgment acknowledgment) {
boolean commitOffsets = false;
while (!commitOffsets) {
try {
handleMessage(foo.getMsg());
commitOffsets = true;
} catch (CustomException e) {
log.error("Exception caught. Not committing offset to Kafka.");
commitOffsets = false;
}
}
if (commitOffsets) {
log.info("No exceptions, committing offsets.");
acknowledgment.acknowledge();
}
}
private void handleMessage(String message) throws CustomException {
log.info("Busy handling message!");
int messageLength = message.length();
log.info("Message length: " + messageLength);
Random random = new Random();
int randomNumber = random.nextInt(100);
log.info("Random number: " + randomNumber);
if ((randomNumber % 2) != 0) {
throw new CustomException("Odd number generated, so throwing this exception");
}
log.info("Even number generated, committing offsets.");
}
}
对象Foo的定义为:
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Foo implements Serializable {
private static final long serialVersionUID = -6257237213654214992L;
private Integer id;
private String msg;
}
四,简单测试一下
@Slf4j
@Component
public class MyRunner implements CommandLineRunner {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final CountDownLatch countDownLatch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
kafkaTemplate.send("myTopic", new Foo(1, "test1"))
.addCallback((sendResult) -> {
log.info(sendResult.toString());
countDownLatch.countDown();
},
(throwable) -> {
log.error("error", throwable);
countDownLatch.countDown();
});
kafkaTemplate.send("myTopic", 1)
.addCallback((sendResult) -> {
log.info(sendResult.toString());
countDownLatch.countDown();
},
(throwable) -> {
log.error("error", throwable);
countDownLatch.countDown();
});
kafkaTemplate.send("myTopic", "demo")
.addCallback((sendResult) -> {
log.info(sendResult.toString());
countDownLatch.countDown();
},
(throwable) -> {
log.error("error", throwable);
countDownLatch.countDown();
});
countDownLatch.await(5, TimeUnit.SECONDS);
}
}