1、基本环境
JDK:1.8
SpringBoot版本:2.3.8.RELEASE
Kafka版本:2.7单机部署
2、创建项目
在IDEA中新建SpringBoot项目
image.png
选择需要引入的依赖
image.png
最终pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.study</groupId>
<artifactId>kafka-study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-study</name>
<description>学习Kafka</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3、Kafka配置类
首先在application.properties中新建一个配置:
kafka.broker.list=192.168.30.128:9092
然后新建一个KafkaConfig的配置
package com.study.kafka.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* Kafka配置
*/
@Configuration
public class KafkaConfig {
@Value("${kafka.broker.list}")
public String brokerList;
public static final String TOPIC = "syslogs";
public Properties producerConfigs() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 消息缓存
//生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
//生产者重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
@Bean
public Producer<Integer, Object> getKafkaProducer() {
//KafkaProducer是线程安全的,可以在多个线程中共享单个实例
return new KafkaProducer<Integer, Object>(producerConfigs());
}
}
4、使用测试类发送消息
package com.study.kafka;
import com.study.kafka.config.KafkaConfig;
import com.study.kafka.domain.LogInfo;
import kafka.utils.Json;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@Slf4j
@SpringBootTest
class KafkaStudyApplicationTests {
@Autowired
Producer producer;
@Test
void sendKafkaMsg() {
LogInfo logInfo = new LogInfo();
logInfo.setId(10000L);
logInfo.setIp("11.22.33.44");
logInfo.setDeviceType("Huawei Mate40");
logInfo.setOsVersion("11.0");
logInfo.setCreateDate(new Date());
// json消息
String msg = Json.encodeAsString(logInfo);
ProducerRecord<String,String> record = new ProducerRecord<>(KafkaConfig.TOPIC,msg);
try {
// 异步发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
log.info("发送成功,分区:{},偏移量:{}",metadata.partition(),metadata.offset());
}else {
log.info("异常:{}",e.getMessage());
}
}
});
}catch (Exception ex) {
ex.printStackTrace();
}
}
}
5、使用kafka-console-consumer.sh消费消息
在控制台执行命令,监听消息:
[root@master kafka_2.12-2.7.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.128:9092 --topic syslogs --from-beginning
输出如下:
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583407099}
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583457711}
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583493982}
作者:大雄喵