实现之前一定要把JDK,Zookeeper和Kafka都配置好
需要先配置下Kafka
在kafka的config
目录下找到server.properties
配置文件
把listeners
和 advertised.listeners
两处配置的注释去掉,可以根据需要配置连接的服务器外网IP
和端口号
,我这里演示选择的是本地localhost
和默认端口9092
Kafka与SpringBoot进行整合
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.编写生产者和消费者
@RestController
public class KafkaController {
private static Logger logger = LoggerFactory.getLogger(KafkaController.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/kafka")
public String testKafka() {
int iMax = 100;
for (int i = 1; i < iMax; i++) {
kafkaTemplate.send("test","key" + i, "data" + i);
}
return "success";
}
@KafkaListener(topics = "test")
public void receive(ConsumerRecord<?, ?> consumer) {
logger.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value());
}
}
相关代码说明
KafkaTemplate
这个类包装了个生产者Producer
,来提供方便的发送数据到kafka
的主题topic
里面。
send()
方法的源码,KafkaTemplate
类中还重载了很多send()
方法,有需要可以看看源码
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
通过KafkaTemplate
模板类发送数据。
kafkaTemplate.send(String topic, K key, V data)
,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过@KafkaListener
注解配置用户监听topics
配置文件application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: kafka2
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers
:kafka服务器地址(可以多个)
consumer.group-id
:指定一个默认的组名
不指定的话会报
java.lang.IllegalStateException: No group.id found in consumer config,
container properties, or @KafkaListener annotation;
a group.id is required when group management is used.
auto-offset-reset
:自动偏移量
1.earliest
:当各分区下有已提交的offset
时,从提交的offset
开始消费;无提交的offset
时,从头开始消费
2.latest
:当各分区下有已提交的offset
时,从提交的offset
开始消费;无提交的offset
时,消费新产生的该分区下的数据
3.none
:topic
各分区都存在已提交的offset
时,从offset
后开始消费;只要有一个分区不存在已提交的offset
,则抛出异常
这个属性也是必须配置的,不然也是会报错的
org.apache.kafka.common.config.ConfigException:
Invalid value for configuration auto.offset.reset:
String must be one of: latest, earliest, none
消息序列化和反序列化
在使用Kafka发送接收消息时,生产者producer
端需要序列化,消费者consumer
端需要反序列化,由于网络传输过来的是byte[]
,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumer.key-deserializer
和consumer.value-deserializer
是消费者key/value
反序列化
producer.key-deserializer
和producer.value-deserializer
是生产者key/value
序列化
StringDeserializer
是内置的字符串反序列化方式
public class StringDeserializer implements Deserializer<String> {
public String deserialize(String topic, byte[] data) {
try {
//如果数据为空,那么直接返回null即可,否则将byte[]反序列化,即转为String即可
return data == null ? null : new String(data, this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
}
}
......
}
StringSerializer
是内置的字符串序列化方式
public class StringSerializer implements Serializer<String> {
public byte[] serialize(String topic, String data) {
try {
//如果数据为空,那么直接返回null即可,否则将String序列化,即转为byte[]即可
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
......
}
在org.apache.kafka.common.serialization
源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口Serializer
要自定义反序列化方式,需要实现接口Deserializer
详细可以参考
https://blog.csdn.net/shirukai/article/details/82152172
启动项目进行测试
这是Kafka
的消费者Consumer
的配置信息,每个消费者都会输出该配置信息
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka2
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2018-11-22 14:16:53.465 INFO 11980 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.0
2018-11-22 14:16:53.465 INFO 11980 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 3402a8361b734732
2018-11-22 14:16:57.664 INFO 11980 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: d3n7Snc2TFmSFcNsHjqgVw
访问http://localhost:8080/kafka,就可以看到控制台打印消息了