Kafka的生产者和消费者实例

Kafka的生产者和消费者实例

Producer Demo演示

导入Maven依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.0</version>
        </dependency>

ProducerTest.java

public class ProducerTest {
    public static void main(String[] args) throws Exception {

        // 创建生产者
        Properties props = new Properties();

        // 这里可以配置一个或多个broker地址,会自动从broker去拉取元数据进行缓存
        props.put("bootstrap.servers", "bigdata02:9092,bigdata03:9092,bigdata04:9092,bigdata05:9092");
        // 这里是把发送的key从字符串序列化为字节数组
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 这里是把发送的message从字符串序列化为字节数组
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 创建一个Producer实例:线程资源,跟各个broker建立socket连接资源
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        //ProducerRecord<String, String> record = new ProducerRecord<>(
        //        "test", "message1");
        ProducerRecord<String, String> record = new ProducerRecord<>(
                 "test","key1", "message2");
        /**
         *
         * 如果发送消息,消息不指定key,那么我们发送的这些消息,会被轮训的发送到不同的分区。
         * 如果指定了key。发送消息的时候,客户端会根据这个key计算出来一个hash值,
         * 根据这个hash值会把消息发送到对应的分区里面。
         */

        //kafka发送数据有两种方式:
        //1:异步的方式。
        // 这是异步发送的模式
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    // 消息发送成功
                    System.out.println("消息发送成功");
                } else {
                    // 消息发送失败,需要重新发送
                }
            }

        });

        Thread.sleep(10 * 1000);

        //第二种方式:这是同步发送的模式
//      producer.send(record).get();
        // 你要一直等待人家后续一系列的步骤都做完,发送消息之后
        // 有了消息的回应返回给你,你这个方法才会退出来

        producer.close();
    }
}

Consumer Demo演示

public class ConsumerTest {
    public static void main(String[] args) {

        // 第一步:创建消费者
        Properties props = new Properties();

        // 这里可以配置一个或多个broker地址,会自动从broker去拉取元数据进行缓存
        props.put("bootstrap.servers", "bigdata02:9092,bigdata03:9092,bigdata04:9092,bigdata05:9092");
        // 指定消费组的id
        String groupId = "testtest";
        props.put("group.id", groupId);
        // 这里是把发送的key从字节数组反序列化为字符串
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 这里是把发送的message从字节数组反序列化为字符串
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        String topicName = "test";
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 第二步:指定消费者主题
        // 一个消费者可以同时消费多个主题
        consumer.subscribe(Arrays.asList(topicName));

        try {
            while (true) {
                // 第三步:去服务器消费数据
                ConsumerRecords<String, String> records = consumer.poll(1000);// 超时时间
                // 第四步:对数据进行处理
                // 业务逻辑操作
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.offset() + ", " + record.key() + ", " + record.value());
                }
            }
        } catch (Exception e) {

        }


    }

}

Producer消息发送原理

Producer发送一条消息,首先需要选择一个topic的分区,默认是轮询来负载均衡,但是如果指定了一 个分区key,那么根据这个key的hash值来分发到指定的分区,这样可以让相同的key分发到同一个分区 里去,还可以自定义partitioner来实现分区策略。

producer.send(msg); // 用类似这样的方式去发送消息,就会把消息给你均匀的分布到各个分区上去
producer.send(key, msg); // 订单id,或者是用户id,他会根据这个key的hash值去分发到某个分 区上去,他可以保证相同的key会路由分发到同一个分区上去

每次发送消息都必须先把数据封装成一个ProducerRecord对象,里面包含了要发送的topic,具体在哪 个分区,分区key,消息内容,timestamp时间戳,然后这个对象交给序列化器,变成自定义协议格式 的数据,接着把数据交给partitioner分区器,对这个数据选择合适的分区,默认就轮询所有分区,或者 根据key来hash路由到某个分区,这个topic的分区信息,都是在客户端会有缓存的,当然会提前跟 broker去获取。接着这个数据会被发送到producer内部的一块缓冲区里,然后producer内部有一个 Sender线程,会从缓冲区里提取消息封装成一个一个的batch,然后每个batch发送给分区的leader副 本所在的broker。

kafka生产者发送消息原理图

Producer核心参

如何提升吞吐量:

buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB

如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说 这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

props.put("buffer.memory",33554432)

compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销

props.put("compression.type","lz4")

batch.size:设置batch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里

默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量.

配合参数linger.ms使用。

linger.ms这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之 类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自 然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟 时间太长,也避免给内存造成过大的一个压力。

props.put("batch.size",32768)
props.put("linger.ms",100)

自定义分区

1,没有设置key
我们的消息就会被轮训的发送到不同的分区。

2,设置了key
kafka自带的分区器,会根据key计算出来一个hash值,这个hash值会对应某一个分区。
如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区。

3,有些比较特殊的时候,我们就需要自定义分区

自定义方法示例

public class HotDataPartitioner implements Partitioner {
    private Random random;

    @Override
    public int partition(String topic, Object keyObj, byte[] bytes, Object value, byte[] valueBytes, Cluster cluster) {
        String key = (String) keyObj;
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        // 获取到分区的个数 假设3个 0,1,2
        int partitionCount = partitionInfoList.size();
        // 最后一个分区
        int hotDataPartition = partitionCount - 1;
        // 将key包含"hot_data"的放到最后一个分区
        return !key.contains("hot_data") ? random.nextInt(hotDataPartition) : hotDataPartition;
    }
    
}
//"**.**.HotDataPartitioner"为上述示例的路径
props.put("partitioner.class", "**.**.HotDataPartitioner");

常见的异常处理

生产者在数据发送过程3种常见的异常:

1)LeaderNotAvailableException:
这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败.
要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可
如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException。

2)NotControllerException:
这个也是同理,如果说Controller所在Broker挂了,
那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可

3)NetworkException:网络异常 timeout

重试机制参数:

**retries:重试次数,默认值是3 **
retry.backoff.ms:两次重试之间的时间间隔

props.put("retries", 10);
    
props.put("retry.backoff.ms", 300);

生产者重试机制带来的问题:

1. 消息会重复
有的时候一些leader切换之类的问题,需要进行重试,设置retries即可,
消息重试会导致,重复发送的问题,比如说网络抖动一下导致他以为没成功,就重试了,
其实人家都成功了.

2. 消息乱序
消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了。

max.in.flight.requests.per.connection参数设置为1,这 样可以保证producer同一时间只能发送一条消息

props.put("max.in.flight.requests.per.connection", 1);

ack参数详解:

1.request.required.acks=0:
只要请求已发送出去,就算是发送完了,不关心有没有写成功。
性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。

2.request.required.acks=1:
发送一条消息,当leader partition写入成功以后,才算写入成功。
不过这种方式以后丢数据的可能,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader。

3.request.required.acks=-1:
需要ISR列表里面,所有副本都写完以后,这条消息才算写入成功。
ISR:3个副本。1 leader partition 2 follower partition

request.required.acks是控制发送出去的消息的持久化机制的

props.put("request.required.acks", -1);

如果要想保证数据不丢失,得如下设置:

 a)min.insync.replicas = 2,
ISR里必须有2个副本,一个leader和一个follower,
最最起码的一 个,不能只有一个leader存活,连一个follower都没有了
b)acks = -1,
每次写成功一定是leader和follower都成功才可以算做成功,
leader挂了,follower 上是一定有这条数据,不会丢失
c) retries = Integer.MAX_VALUE,无限重试,
如果上述两个条件不满足,写入一直失败,就会无限次 重试,保证说数据必须成功的发送给两个副本,
如果做不到,就不停的重试,
除非是面向金融级的场景,面向 企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失

kafka服务端相关参数:

min.insync.replicas:2, 如果我们不设置的话,默认这个值是1
一个leader partition会维护一个ISR列表,这个值就是限制ISR列表里面
至少得有几个副本,比如这个值是2,那么当ISR列表里面只有一个副本的时候。
往这个分区插入数据的时候会报错。

如果面试官问让你设计一个不丢数据的方案,怎么设计?

1)首先创建主题时候 分区副本>=2 2,3

2)ack = -1 让ISR所有副本都写入成功

3)min.insync.replicas >=2

消费者重要概念原理:

消费者核心参数:

异常感知

heartbeat.interval.ms consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下 发rebalance的指令给其他的consumer通知他们进行rebalance的操作

props.put("heartbeat.interval.ms",3000);

session.timeout.ms kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

props.put("session.timeout.ms", 10*1000);

max.poll.interval.ms 如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费 组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了

props.put("max.poll.interval.ms", 5*1000);

fetch.max.bytes 获取一条消息最大的字节数,一般建议设置大一些, 默认是1M

props.put("fetch.max.bytes", 10*1024*1024); 

max.poll.records 一次poll返回消息的最大条数,默认是500条

props.put("max.poll.records", 1000);

connection.max.idle.ms consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要 重新建立socket连接,这个建议设置为-1,不要去回收

 props.put("connection.max.idle.ms", -1);

根据偏移量消费的策略:

auto.offset.reset
1.earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消 费
2.latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置 开始消费
3.none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常
注:我们生产里面一般设置的是latest

props.put("auto.offset.reset",latest);

enable.auto.commit 开启自动提交偏移量offset

props.put("enable.auto.commit", true);

auto.commit.ineterval.ms 每隔多久提交一次偏移量,默认值5000毫秒

props.put("auto.commit.interval.ms", 5*1000);

消费者组概念

总结:

1)每个consumer都要属于一个consumer.group,就是一个消费组.
topic的一个分区只会分配给一个消费者组下的一个consumer来处理.
每个consumer可能会分配多个分区.
也有可能某个consumer没有分配到任何分区.

2)如果想要实现一个广播的效果,那只需要使用不同的group id去消费就可以。

3)如果consumer.group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他.也就是说,一个消费者组内部会自己实现一个重平衡,或者负载均衡的这样一个效果.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351