(4)kafka的应用

下面展示的例子是kafka的客户端的使用,包含了发送端的同步发送消息和异步发送消息的使用,以及接收端的消费消息的使用,以及自定分区的使用

1.环境的搭建

需要配置kafka的集群环境: 可以参考https://www.jianshu.com/p/d39ade36f606
需要依赖kafka的客户端的jar,maven的依赖如下:

   <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.0</version>
    </dependency>

2.kafka的发送端的同步发送和异步发送

这里可以参考kafkaProducer的api : http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

在代码执行之前,我默认在kafka自己创建了三个分区的mytopic,并且副本为1

 bin/kafka-topics.sh  --zookeeper 192.168.44.129:2181 --partitions 3 --replication-factor 1 --create --topic my-topic

代码入下:

/**
 * @Project: kafka
 * @description:  kafka的producer的同步发送和异步发送
 * @author: sunkang
 * @create: 2018-12-16 21:24
 * @ModificationHistory who      when       What
 **/
public class KafkaProducerDemo extends Thread {
    private final KafkaProducer<Integer,String> producer;
    private  String topic;
    //是否为异步发送
    private boolean async;

    public KafkaProducerDemo(String topic,boolean async){
        Properties properties = new Properties();
        //bootstrap.servers  kafka的集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
        //client.id 客户端id
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
        //acks =-1,表示集群中的所有成员都需要确认
        properties.put(ProducerConfig.ACKS_CONFIG,"-1");
        //发送到同一分区,批量发送数据包的大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //两次发送的时间间隔内,把所有的request进行聚合在发送
        properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
        //发送的消息的key的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
        //发送消息的value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        producer  = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
        this.async = async;
    }

    public void run(){
        int num  = 0;
        while (true){
            String message = "message_"+num;
            System.out.println("begin send message"+ message);
            //异步发送
            if(async){
                producer.send(new ProducerRecord<Integer, String>(topic, num, message), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(e != null){
                            e.printStackTrace();
                        }else{
                            System.out.println("async-offset:"+ recordMetadata.offset()+"partition:"+recordMetadata.partition());
                        }
                    }
                });
            }else{   //同步发送
                Future<RecordMetadata>  recordMetadataFuture = producer.send(new ProducerRecord<Integer, String>(topic,num,message));
                try {
                    RecordMetadata  recordMetadata =  recordMetadataFuture.get();
                    System.out.println("sync-offset:"+ recordMetadata.offset()+"partition:"+ recordMetadata.partition());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            num++;
            try {//间隔一秒之后在发送
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //异步发送
        new KafkaProducerDemo("my-topic",true).start();
    }
 }

输出如下:默认是有三个分区的,可以看到消息存储的分区都不一样,实现了消息的分片的作用

begin send messagemessage_0
async-offset:56->partition:1
begin send messagemessage_1
async-offset:59->partition:0
begin send messagemessage_2
async-offset:57->partition:2

3.kafka的消费端的消费

消费端的例子可以参考官网KafkaConsumer的api: http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

  • 消费端的自动偏移量提交和手动偏移提交
/**
 * @Project: kafka
 * @description:消费端的自动偏移量提交和手动偏移提交
 * @author: sunkang
 * @create: 2018-12-16 21:34
 * @ModificationHistory who      when       What
 **/
public class KafkaConsumerDemo  extends Thread{

    private final KafkaConsumer kafkaConsumer;

    private  final   boolean autoOffesetCommit;


    public KafkaConsumerDemo(String topic, boolean autoOffesetCommitt) {
        Properties properties=new Properties();
        //服务地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
        this.autoOffesetCommit = autoOffesetCommitt;
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  autoOffesetCommit == true? "true":"false");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        kafkaConsumer=new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(topic));
    }

    public void run(){
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        //一次性批量确认
        final int minBatchSize = 10;

        while (true){
            if(this.autoOffesetCommit){//自动偏移提交
                //从broker拉取消息
                ConsumerRecords<Integer,String> consumerRecords= kafkaConsumer.poll(100);
                for(ConsumerRecord record : consumerRecords ){
                    System.out.println("message  receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition());                }
            }else{//需要手动偏移量控制
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("message  receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition());
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                  //当接收消息需要处理的进行逻辑处理的时候,需要手动偏移量控制,比如当消息插入数据库完全成功的时候, 才认为消息完全消费了
                 //   insertIntoDb(buffer);
                    kafkaConsumer.commitSync();
                    buffer.clear();
                }
            }

        }
    }
    public static void main(String[] args) {
        new KafkaConsumerDemo("my-topic",true).start();
    }

}

4.kafka自定义分区

自定义分区策略是根据消息的key来映射具体的分区,需要实现org.apache.kafka.clients.producer.Partitioner接口

/**
 * 自定义分区策略
 */
public class MyPartition implements Partitioner {
    private Random random=new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获得分区列表
        List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic);
        int partitionNum=0;
        if(key==null){
            partitionNum=random.nextInt(partitionInfos.size()); //随机分区
        }else{
            partitionNum=Math.abs((key.hashCode())%partitionInfos.size());
        }
        System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum);
        return partitionNum;  //指定发送的分区值
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

如果要指定自己实现的自定义分区策略,需要增加partitioner.class的配置属性

Properties properties=new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.partion.MyPartition");
producer=new KafkaProducer<Integer, String>(properties);

5.配置信息分析

发送端的可选配置信息分析
  • acks

acks 配置表示 producer 发送消息到 broker 上以后的确认值。有三个可选项 :
0:表示 producer 不需要等待 broker 的消息确认。这个选项时延最小但同时风险最大(因为当 server 宕机时,数据将会丢失)。
1:表示 producer 只需要获得 kafka 集群中的 leader 节点确认即可,这个选择时延较小同时确保了 leader 节点确认接收成功
all(-1):需要 ISR 中所有的 Replica 给予接收确认,速度最慢,安全性最

但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 并不能一定避免数据丢失

  • batch.size

生产者发送多个消息到 broker 上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是 16384byte,也就是 16kb,意味着当一批消息大小达到指定的 batch.size 的时候会统一发送

  • linger.ms

Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合然后再发送,以此提高吞吐量,而 linger.ms 就是为每次发送到 broker 的请求,增加一些 delay,以此来聚合更多的Message 请求。 这个有点想 TCP 里面的Nagle 算法,在 TCP 协议的传输中,为了减少大量小数据包的发送,采用了Nagle 算法,也就是基于小包的等停协议

batch.size 和 linger.ms 这两个参数是 kafka 性能优化的关键参数,很多同学会发现 batch.size 和 linger.ms 这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配置的时候,只要满足其中一个要求,就会发送请求到 broker 上

  • max.request.size

设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为 1MB

消费端的可选配置分析
  • group.id

consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即 group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费

如下图所示,分别有三个消费者,属于两个不同的 group,那么对于 firstTopic 这个 topic 来说,这两个组的消费者都能同时消费这个 topic 中的消息,对于此事的架构来说,这个 firstTopic 就类似于 ActiveMQ 中的 topic 概念。

如最下图所示,如果 3 个消费者都属于同一个group,那么此事 firstTopic 就是一个 Queue 的概念

  • enable.auto.commit

消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合 auto.commit.interval.ms 控制自动提交的频率。

当然,我们也可以通过 consumer.commitSync()的方式实现手动提交

  • auto.offset.reset

这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来消费指定的 topic 时,对于该参数的配置,会有不同的语义

auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的offset 处开始消费 Topic 下的消息
auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始消费
auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。

  • max.poll.records

此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔

6.与spring-kafka集成

  • 依赖的maven的配置如下
 <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.0.4.RELEASE</version>
    </dependency>
  • 发送端的producerKafka.xml的spring配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="producerProperties" class="java.util.HashMap">
      <constructor-arg>
          <map>
                <entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
                <entry key="client.id" value="sping-kafka-producer"/>
                <entry key="acks" value="-1"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
          </map>
      </constructor-arg>

    </bean>

    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg ref="producerProperties"/>
    </bean>

    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
    </bean>

</beans>
  • 消费端的配置consumerKafka.xml的配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
                <entry key="group.id" value="registryConsumer"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>

    </bean>

    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="consumerProperties"/>
    </bean>


    <bean id="registryListener" class="com.kafka.spring.RegistryListener"/>

    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg name="topics" value="my-topic"/>
        <property name="messageListener" ref="registryListener"/>

    </bean>

    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>
</beans>
  • 发送端的启动代码
/**
 * @Project: kafka
 * @description:  通过KafkaTemplate进行发送消息
 * @author: sunkang
 * @create: 2018-12-23 18:52
 * @ModificationHistory who      when       What
 **/
public class SpringKafkaProducerDemo {
    public static void main(String[] args) {

        ApplicationContext context  = new ClassPathXmlApplicationContext("classpath:producerKafka.xml");

        KafkaTemplate  kafkaTemplate =  context.getBean("kafkaTemplate", KafkaTemplate.class);

        kafkaTemplate.send("my-topic",1,"message_1");
        kafkaTemplate.send("my-topic",2,"message_2");
    }
}
  • 消费端的启动代码

/**
 * @Project: kafka
 * @description:  消费端的启动代码
 * @author: sunkang
 * @create: 2018-12-23 18:51
 * @ModificationHistory who      when       What
 **/
public class SpringKafkaConsumerDemo {
    public static void main(String[] args) {
        ApplicationContext context  = new ClassPathXmlApplicationContext("consumerKafka.xml");
    }
}
//设置消息监听类
class RegistryListener implements MessageListener<Integer,String> {

    @Override
    public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord) {
        System.out.println("收到了消息");
        System.out.println("key:"+integerStringConsumerRecord.key()+"->value:"+integerStringConsumerRecord.value());
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容