1.导包pom
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
2.配置文件application.yml
kafka:
producer:
batch-size: 30
consumer:
group-id: test
bootstrap-servers: 192.168.72.128:9092
3.生产者
@RestController
public class KafkaProducer {
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("test/kafka/producer/{msg}")
public String testKafkaProducer(@PathVariable("msg") String msg){
kafkaTemplate.send("top_name","key",msg);
return "ok";
}
}
4.消费者
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {"top_name"})
public void handMessage(ConsumerRecord<?,?> consumerRecord){
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
log.info("message:{}", msg);
}
}
}
5.启动类
@SpringBootApplication(scanBasePackages = {"com.example.demo"})
@EnableAspectJAutoProxy(proxyTargetClass = true)
@EnableScheduling
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
6.打开浏览器访问
localhost:7070/test/kafka/producer/kangkang