下面展示的例子是kafka的客户端的使用,包含了发送端的同步发送消息和异步发送消息的使用,以及接收端的消费消息的使用,以及自定分区的使用
1.环境的搭建
需要配置kafka的集群环境: 可以参考https://www.jianshu.com/p/d39ade36f606
需要依赖kafka的客户端的jar,maven的依赖如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
2.kafka的发送端的同步发送和异步发送
这里可以参考kafkaProducer的api : http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
在代码执行之前,我默认在kafka自己创建了三个分区的mytopic,并且副本为1
bin/kafka-topics.sh --zookeeper 192.168.44.129:2181 --partitions 3 --replication-factor 1 --create --topic my-topic
代码入下:
/**
* @Project: kafka
* @description: kafka的producer的同步发送和异步发送
* @author: sunkang
* @create: 2018-12-16 21:24
* @ModificationHistory who when What
**/
public class KafkaProducerDemo extends Thread {
private final KafkaProducer<Integer,String> producer;
private String topic;
//是否为异步发送
private boolean async;
public KafkaProducerDemo(String topic,boolean async){
Properties properties = new Properties();
//bootstrap.servers kafka的集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
//client.id 客户端id
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
//acks =-1,表示集群中的所有成员都需要确认
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
//发送到同一分区,批量发送数据包的大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//两次发送的时间间隔内,把所有的request进行聚合在发送
properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
//发送的消息的key的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
//发送消息的value的序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
this.async = async;
}
public void run(){
int num = 0;
while (true){
String message = "message_"+num;
System.out.println("begin send message"+ message);
//异步发送
if(async){
producer.send(new ProducerRecord<Integer, String>(topic, num, message), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("async-offset:"+ recordMetadata.offset()+"partition:"+recordMetadata.partition());
}
}
});
}else{ //同步发送
Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<Integer, String>(topic,num,message));
try {
RecordMetadata recordMetadata = recordMetadataFuture.get();
System.out.println("sync-offset:"+ recordMetadata.offset()+"partition:"+ recordMetadata.partition());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
num++;
try {//间隔一秒之后在发送
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//异步发送
new KafkaProducerDemo("my-topic",true).start();
}
}
输出如下:默认是有三个分区的,可以看到消息存储的分区都不一样,实现了消息的分片的作用
begin send messagemessage_0
async-offset:56->partition:1
begin send messagemessage_1
async-offset:59->partition:0
begin send messagemessage_2
async-offset:57->partition:2
3.kafka的消费端的消费
消费端的例子可以参考官网KafkaConsumer的api: http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
- 消费端的自动偏移量提交和手动偏移提交
/**
* @Project: kafka
* @description:消费端的自动偏移量提交和手动偏移提交
* @author: sunkang
* @create: 2018-12-16 21:34
* @ModificationHistory who when What
**/
public class KafkaConsumerDemo extends Thread{
private final KafkaConsumer kafkaConsumer;
private final boolean autoOffesetCommit;
public KafkaConsumerDemo(String topic, boolean autoOffesetCommitt) {
Properties properties=new Properties();
//服务地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
this.autoOffesetCommit = autoOffesetCommitt;
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoOffesetCommit == true? "true":"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
kafkaConsumer=new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
public void run(){
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
//一次性批量确认
final int minBatchSize = 10;
while (true){
if(this.autoOffesetCommit){//自动偏移提交
//从broker拉取消息
ConsumerRecords<Integer,String> consumerRecords= kafkaConsumer.poll(100);
for(ConsumerRecord record : consumerRecords ){
System.out.println("message receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition()); }
}else{//需要手动偏移量控制
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("message receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//当接收消息需要处理的进行逻辑处理的时候,需要手动偏移量控制,比如当消息插入数据库完全成功的时候, 才认为消息完全消费了
// insertIntoDb(buffer);
kafkaConsumer.commitSync();
buffer.clear();
}
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("my-topic",true).start();
}
}
4.kafka自定义分区
自定义分区策略是根据消息的key来映射具体的分区,需要实现org.apache.kafka.clients.producer.Partitioner
接口
/**
* 自定义分区策略
*/
public class MyPartition implements Partitioner {
private Random random=new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获得分区列表
List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic);
int partitionNum=0;
if(key==null){
partitionNum=random.nextInt(partitionInfos.size()); //随机分区
}else{
partitionNum=Math.abs((key.hashCode())%partitionInfos.size());
}
System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum);
return partitionNum; //指定发送的分区值
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
如果要指定自己实现的自定义分区策略,需要增加partitioner.class的配置属性
Properties properties=new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.partion.MyPartition");
producer=new KafkaProducer<Integer, String>(properties);
5.配置信息分析
发送端的可选配置信息分析
acks
acks 配置表示 producer 发送消息到 broker 上以后的确认值。有三个可选项 :
0:表示 producer 不需要等待 broker 的消息确认。这个选项时延最小但同时风险最大(因为当 server 宕机时,数据将会丢失)。
1:表示 producer 只需要获得 kafka 集群中的 leader 节点确认即可,这个选择时延较小同时确保了 leader 节点确认接收成功
all(-1):需要 ISR 中所有的 Replica 给予接收确认,速度最慢,安全性最
但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 并不能一定避免数据丢失
batch.size
生产者发送多个消息到 broker 上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是 16384byte,也就是 16kb,意味着当一批消息大小达到指定的 batch.size 的时候会统一发送
linger.ms
Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合然后再发送,以此提高吞吐量,而 linger.ms 就是为每次发送到 broker 的请求,增加一些 delay,以此来聚合更多的Message 请求。 这个有点想 TCP 里面的Nagle 算法,在 TCP 协议的传输中,为了减少大量小数据包的发送,采用了Nagle 算法,也就是基于小包的等停协议
batch.size 和 linger.ms 这两个参数是 kafka 性能优化的关键参数,很多同学会发现 batch.size 和 linger.ms 这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到 broker 上
max.request.size
设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为 1MB
消费端的可选配置分析
group.id
consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即 group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费
如下图所示,分别有三个消费者,属于两个不同的 group,那么对于 firstTopic 这个 topic 来说,这两个组的消费者都能同时消费这个 topic 中的消息,对于此事的架构来说,这个 firstTopic 就类似于 ActiveMQ 中的 topic 概念。
如最下图所示,如果 3 个消费者都属于同一个group,那么此事 firstTopic 就是一个 Queue 的概念
enable.auto.commit
消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合 auto.commit.interval.ms 控制自动提交的频率。
当然,我们也可以通过 consumer.commitSync()的方式实现手动提交
auto.offset.reset
这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来消费指定的 topic 时,对于该参数的配置,会有不同的语义
auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的offset 处开始消费 Topic 下的消息
auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始消费
auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
max.poll.records
此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔
6.与spring-kafka集成
依赖的maven的配置如下
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.4.RELEASE</version>
</dependency>
发送端的producerKafka.xml的spring配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
<entry key="client.id" value="sping-kafka-producer"/>
<entry key="acks" value="-1"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg ref="producerProperties"/>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
</bean>
</beans>
消费端的配置consumerKafka.xml的配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
<entry key="group.id" value="registryConsumer"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg ref="consumerProperties"/>
</bean>
<bean id="registryListener" class="com.kafka.spring.RegistryListener"/>
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="my-topic"/>
<property name="messageListener" ref="registryListener"/>
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
</beans>
发送端的启动代码
/**
* @Project: kafka
* @description: 通过KafkaTemplate进行发送消息
* @author: sunkang
* @create: 2018-12-23 18:52
* @ModificationHistory who when What
**/
public class SpringKafkaProducerDemo {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:producerKafka.xml");
KafkaTemplate kafkaTemplate = context.getBean("kafkaTemplate", KafkaTemplate.class);
kafkaTemplate.send("my-topic",1,"message_1");
kafkaTemplate.send("my-topic",2,"message_2");
}
}
消费端的启动代码
/**
* @Project: kafka
* @description: 消费端的启动代码
* @author: sunkang
* @create: 2018-12-23 18:51
* @ModificationHistory who when What
**/
public class SpringKafkaConsumerDemo {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("consumerKafka.xml");
}
}
//设置消息监听类
class RegistryListener implements MessageListener<Integer,String> {
@Override
public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord) {
System.out.println("收到了消息");
System.out.println("key:"+integerStringConsumerRecord.key()+"->value:"+integerStringConsumerRecord.value());
}
}