Kafka入门摘要

本文由叩丁狼教育Java大神班六期杨光同学原创

安装kafka:

kafka的使用需要zookeeper,默认的kafka包中自带zookeeper。但是有局限性。

安装注意:
1:需要自己写zookeeper关联的配置文件zoo.cnf

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20(从节点与主节点建立初始化连接的时间上限)
syncLimit=5(从节点与主节点不同步状态的时间上线)

群组需要额外配置的:

server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888

(server.X=hostname:peerPort:leaderPort X:服务器ID,hostname:服务器的及其名或IP地址,peerPort:节点间通信的TCP端口,leaderPort:首领选举的TCP端口)
每个服务器必须在data Dir目录中创建一个叫做myid的文件,文件需要包含服务器ID,与配置文件中的ID保持一致

kafka的broker选举成为控制器机制:

核心思想是:首先创建一个ephemeral(短暂的)节点,例如“/election”,然后每一个服务器在此节点下创建一个sequence/ephemeral类型的节点,例如”/election/n_”.在sequence标志下,Zookeeper将自动的为每一个Zookeeper服务器分配一个比前一个序号要大的序号。此时创建节点的Zookeeper服务器中拥有最小序号编号的为控制器。

实际操作中还需要保障,当控制器服务器发生故障的时候,能够快速的选出下一个Zookeeper服务器作为控制器。一个简单的解决方案就是:所有的follwer监视控制器所对应的节点。当控制器发生故障时,所对应的控制器临时节点被删除,此时触发所有监视控制器服务器的watch。这样这些服务器收到控制器故障消息,并进行下一次的控制器选举操作。但是这样会引发“从众效应”的发生,尤其是当集群中服务器众多,带宽延时较大的时候,更为明显。

Zookeeper为了避免从众效应的产生,这样实现:每个follower对follwer集群中对应比自己节点序号小一号的节点(也就是对应序号比自己小的节点中的序号最大的节点)设置一个watch。只有当follower所设置的watch被触发的时候,他才进行控制器选举操作,一般情况下它将成为下一个控制器

Kafka的broker配置

  1. broker.id 每个broker的标识符(从0开始的任意整数,集群中唯一)
  2. port 默认9092.注意:使用1024以下端口需要root权限
zookeeper.connect=hostname:port/path
hostname:    Zookeeper服务器的机器名或IP地址
port:        Zookeeper客户端的链接端口号
/path        可选Zookeeper路径,作为Kafka集群的chroot环境,默认跟路径

  1. log.dirs 逗号分隔路径,“最少使用原则”,把同一个分区的日志片段保存到同一个路径下。
  2. num.recovery.threads.per.data.dir 每个数据dir恢复的时候需要使用的线程数服务器正常启动,用于打开每个分区的日志片段
  3. auto.create.topics.enable
    :当一个生产者开始往主题写入消息时
    :当一个消费者开始从主题读取消息时
    :当任意一个客户端向主题发送元数据请求时

Topic的配置

  1. num.partitions
  2. log.retention.ms
  3. log.retention.bytes(上面两个参数,满足任意一个消息就会被删除)
  4. log.segment.bytes
  5. log.segment.ms(上面两个参数,满足任意一个消息片段则会被关闭)
  6. message.max.bytes(消息大小)

生产者概览

[图片上传失败...(image-bb2c4f-1538215358853)]

(kafka发送消息的主要步骤)

创建Kafka生产者

3个必选属性:
1:bootstrap.servers broker地址,建议两个及以上
2:key.serializer
3:value.serializer

创建生产者对象:

Properties props=new Properties();
props.put("bootstrap.servers","broker1:9092,broker2.9092");
props.put("key.serializer",".........StringSerializer");
props.put("value.serializer","......StringSerializer");
KafkaProducer<String,String> producer=new KafkaPropducer<>(props);

  • 发送消息的三种用法

    ProducerRecord<String,String> record=new ProducerRecord<>("topicName","messageKey","messageValue");
    producer.send(record);
    
    

注意:
1:key值相同的会被分配到同一个partition中,但是如果topic增加了partition那么就不能保证了。
2:发送的消息会先放到缓冲区,然后使用单独线程发送到服务器端(默认配置)
3:send()方法会返回一个包含RecordMetadata的Future对象,可以知道是否成功。如果不关心返回结果则可以不去管理。

ProducerRecord<String,String> record=new ProducerRecord<>("topicName","messageKey","messageValue");
record.send(record).get();

注意:
1:send()方法返回一个Future对象
2:调用Future对象,调用Future对象的get()方法等待Kafka响应,如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们会得到一个RecordMetadate对象,可用来获取消息的偏移量

private class DemoProducerCallback implements Callback{
    @Override
    public void onCompletion(RecordMetadata recordMetadata,Exception e){
        if(e!=null){
            e.printStackTrace();
        }
    }
}

ProducerRecord<String,Stirng> record=new ProducerRecord<>("topicName","messageKey","messageValue");
producer.send(record,new DemoProducerCallback());

题外话:
在我们的producer发送消息到kafka上之后,成功了则返回包含RecordMetadate的Future,失败了抛出异常,但是如何判断消息发送成功了?参数”request.required.acks”值来确定。参数值为0:不管结果,发送了则表示成功。1:一个副本确认了则证明成功。-1:所有的都要确认了才表示成功。

同步发送,异步发送,oneway发送三种方式
[图片上传失败...(image-872404-1538215358852)]

该字段用来表示是同步发送还是异步发送,异步发送可以设置批量发送消息,但是数据丢失可能性也会大一些。
异步发送其他的参数:
[图片上传失败...(image-3655e6-1538215358852)]

一般对各模式的对应配置

分区器:
实现Partitioner接口,有三个方法:configure,partition,close。实现partition方法即可。

Kafka消费者

kafka消费者从属于消费者群组consumerGroup,一个群组中订阅的是同一个主题,每个消费者接受主题的一部分分区的消息。每个群组都有个群组协调器broker
注意点:
1、第一个consumer成为群组协调器,协调器进行给consumer进行分配partition
2、消费者可以横向进行拓展(不建议频繁操作)
3、ConsumerGroup中的consumer数量不要超过Topic中的partition数量
4、同一个Topic中的单一partition不会同时分配到同一个ConsumerGroup中的不同的consumer
5、如何判断一个consumer在线:向群组协调器的broker发送心跳(轮训获取消息和提交偏移量的时候)。
6、有consumer挂了,就会进行一次再均衡。

创建Kafka消费者:

Properties prop=new Properties();
prop.put("bootstrap.servers","broker1.9092,broker2.9092");
prop.put("group.id","testGroup");
prop.put("key.deserializer","...");
prop.put("value.deserializer","...");

KafkaConsumer<String,String> consumer=new KafkaConsumer<>(prop);

try{
   while(true){//循环轮训,防止被认为consumer死了
      ConsumerRecords<String,String> records=consumer.poll(100);//发送心跳在此处
      for(ConsumerRecord<String,String> record:records){
         String topic=record.topic();
         Integer partition=record.partition();
         Integer offset=record.offset();
         String key=record.key();
         String value=record.value();
      }
   }
}finally{
   consumer.close();
}

注意:
1、单个线程只能运行一个消费者

Consumer参数:
1、fetch.min.bytes
2、fetch.max.wait.ms
3、max.partition.fetch.bytes //最大分区获取数据大小,默认1M,如果consumer需要处理4个分区,那就是4M数据,数据过大会导致consumer消费时间过长影响session.timeout.ms
4、session.timeout.ms //消费者被认为死亡之前可以与服务器断开连接的时间
5、auto.offset.reset //没有偏移量的分区或者偏移量无效(消费者长时间失效,偏移量记录过时并被删除),该如何处理,默认latest(最近的),还有earliest(起始位置)。
6、enbale.auto.commit //是否自动提交偏移量true或者false,如果设置为true需要auto.commit.interval.ms来决定提交频率
7、partition.assignment.strategy //重分配的分配策略
Range:partitionNum/consumerNum多余的会按照顺序多分配(默认)
RoundRobin:按照顺序循环分配
8、client.id //broker用来标识来源,主要用于日志,度量指标和配额中
9、max.poll.records //每次poll的最大数量
10、receive.buffer.bytes和send.buffer.bytes //TCP缓冲区的大小,设置为-1则使用系统默认的

偏移量:

consumer提交偏移量原理:
向名为consumer_offset的topic发送消息,包含每个partition的offset。如果consumer一直在运行则没什么用处,但是如果发生了再均衡,则会读取该topic中的partition最后一次提交的offset进行继续处理。

自动提交:
设置enable.auto.commit为true则进行自动提交,提交频率auto.commit.interval.ms单位为妙,默认5.
提交当前偏移量:
将auto.commit.offset设置为false,让应用决定何时设置偏移量。使用commitSync()提交偏移量,此方法会提交由poll方法返回的最新偏移量。

while(true){
   ConsummerRecords<String,String> records=consumer.poll(100);
   for(ConsummerRecord record:records){
      //获取数据,进行处理
   }
   try{
      consumer.commitSync();
   }catch(CommitFailedException e){
      log.error("commit failed",e);
   }
}
commitAsync(); 进行异步提交
while(true){
   ConsummerRecords<String,String>  records=consumer.poll(100);
   for(ConsummerRecord record:records){
      //获取数据,进行处理
   }
   consumer.commitAsync();
}

提交之后不必等待,直接去操作接下来的执行动作。但是他也有弊端。
解决方法:

while(true){
   ConsummerRecords<String,String> records=consumer.poll(100);
   for(ConsummerRecord record:records){
      //获取数据,进行处理
   }
   consumer.commitAsync(new OffsetCommitCallback(){//回调方法用于记录错误和重试,回调函数成不成功都执行
      public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets,Exception e){
         if(e!=null){
            log.error("...");
         }
      }
   });
}

手动提交偏移量

private Map<TopicPartition,OffsetAndMetadata> currentOffsets=new HashMap<>();
int count=0;
while(true){
   ConsummerRecords<String,String> records=consumer.poll(100);
   for(ConsummerRecord record:records){
      //获取数据,进行处理
      currentOffsets.put(new TopicPartition(record.topic(),record.partion()),new OffsetAndMetadata(record.offset()+1,"no metada"));
      if(count%1000==0){
         consumer.commitAsync(currentOffsets,null);//null类型为OffsetCommitCallback
      }
   count++;
   }
}

再均衡监听器

进行分配新分区或者移除旧分区的时候导致的再均衡。
实现:调用subscribe()方法的时候参数传:ConsumerRebalanceListener实例就可以了。该接口有两个待实现接口:
1)void onPartitionsRevoked(Collection<TopicPartition> partitions)
该方法在再均衡开始之前和消费者停止读取消息之后被调用。在此处提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
2)void onPartitionsAssigned(Collection<TopicPartitino> partitions)
该方法会在再均之后和消费者开始读取消息之前被调用

示例:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new Hash<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    private void onPartitionsAssigned(Collection<TopicPartition> partitions) {

    }

    private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync(currentOffsets);
    }
}
int count = 0;
try {
    consumer.subscribe(topics, new HandleRebalance());// 传再均衡监听器
    while (true) {
        ConsummerRecords<String, String> records = consumer.poll(100);
        for (ConsummerRecord record : records) {
            // 获取数据,进行处理
            currentOffsets.put(new TopicPartition(record.topic(), record.partion()),
                    new OffsetAndMetadata(record.offset() + 1, "no metada"));
            if (count % 1000 == 0) {
                consumer.commitAsync(currentOffsets, null);// null类型为OffsetCommitCallback
            }
            count++;
        }
    }
} catch (WakeupException e) {
    // 忽略异常,正在关闭消费者
} catch (Exception e) {
    log.error("Unexceped error", e);
} finally {
    try {
        consumer.commitSync(cuurentOffsets);
    } finally {
        consumer.close();
    }
}

将偏移量保存到数据库,从数据库读取偏移量
需要改动的在实现ConsumerRebalanceListener接口的实现类中做修改:

private class HandleRebalance implements ConsumerRebalanceListener {
    private void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        commitDBTransaction();// 在处理数据之前开启事务,在结束之前将事务提交
    }

    private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, getOffsetFromDB(partition));
        }
    }
}

退出:
consumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的退出方法;

final Thread mainThread = Tread.currentThread();

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        log.info("Starting exist...");
        consumer.wakeup();
        try {
            mainThread.join();
        } catch (InterruptedException e) {
            log.error("UnexceptedException", e);
        }
    }
});

想获取更多技术视频,请前往叩丁狼官网:http://www.wolfcode.cn/openClassWeb_listDetail.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,720评论 13 425
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,821评论 4 54
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,314评论 1 15
  • 斑驳的城墙,老旧的木房 那把生锈的钥匙 如何打开掩藏的门与窗 在深冷的夕日 这里有炉火煮茶飘出的沁香 在不语的午后...
    小米小丸子阅读 227评论 1 6