Springboot 集成kafka
1.创建springboot项目并引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.编写配置文件
spring.kafka.bootstrap-servers=ip:9092
spring.kafka.producer.retries=5
spring.kafka.producer.acks=all
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
#开启生产者的事务控制
spring.kafka.producer.transaction-id-prefix=transaction-id-
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.consumer.group-id=group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3. 创建消费者
@SpringBootApplication
public class KafkaSpringBootApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(KafkaSpringBootApplication.class,args);
System.in.read();
}
@KafkaListeners(
value = {
//监听topic
@KafkaListener(topics = {"topic01"})
}
)
public void receive01(ConsumerRecord consumerRecord){
System.out.println("topic01 record:"+consumerRecord);
}
@KafkaListeners(
value = {
@KafkaListener(topics = {"topic02"})
}
)
@SendTo("topic03")//将监听到的信息转发到topic3
public String receive02(ConsumerRecord consumerRecord){
return consumerRecord.value()+"\t"+"fufu zhen shuai ";
}
@KafkaListeners(
value = {
@KafkaListener(topics = {"topic03"})
}
)
public void receive03(ConsumerRecord consumerRecord){
System.out.println("topic03 record:"+consumerRecord);
}
}
4.springboot完成对事务的控制
@SpringBootTest(classes = KafkaSpringBootApplication.class)
@RunWith(SpringRunner.class)
public class KafkaSpringbootTest {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 非事务下执行 这里会报错 说没开启事务
*/
@Test
public void testSend1(){
kafkaTemplate.send(new ProducerRecord("topic01","key007","this is kafkatemplate send message"));
}
/**
* 事务下执行
*/
@Test
public void testSend2(){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send(new ProducerRecord("topic01","key007-tran","this is kafkatemplate send message by tranceation"));
return null;
}
});
}
}
5.利用spring事务完成kafka的事务控制
@SpringBootTest(classes = KafkaSpringBootApplication.class)
@RunWith(SpringRunner.class)
public class MessageSenderTest {
@Autowired
private IMessageSender messageSender;
@Test
public void sendMessage(){
messageSender.sendMessage("topic01","002-service","message send by service template");
}
@Test
public void printInfo(){
messageSender.printInfo("print info by which one service");
}
}
public interface IMessageSender {
public void sendMessage(String topic,String key,String message);
void printInfo(String info);
}
@Service
@Transactional
public class MessageSender implements IMessageSender {
@Autowired
private KafkaTemplate kafkaTemplate;
@Override
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(new ProducerRecord(topic,key,message));
}
@Override
public void printInfo(String info) {
System.out.println("11111111111111111111"+info);
}
}