Kafka初试

昨天在测试环境搭建了一套zookeeper+kafka(各一台)的机器,开始进行kafka的实践之旅。昨天下班前一直都出现无法发送无法接收的问题,今天终于搞定了。

zookeeper的安装

直接从官网下载bin包后,解压即可

tar -zxvf zookeeper-3.4.9.tar.gz

需要修改的配置有:

  1. 把conf目录下的zoo_sample.cfg改名为zoo.cfg(并修改dataDir)
  2. 修改bin目录下的zkEnv.sh脚本中的ZOO_LOG_DIR和ZOO_LOG4J_PROP

启动zookeeper

bin/zkServer.sh start

Kafka的安装

由于只使用了一个broker,所以直接解压包

tar -zxvf kafka_2.11-0.10.2.0.tgz

需要修改的配置为config/server.properties文件,主要修改的有log.dirs和listeners。

listeners=PLAINTEXT://localhost:9092

这里有个坑,server.properties中一定要配置host.name或者listeners,不然会出现无法收发消息的现象
然后启动即可

bin/kafka-server-start.sh config/server.properties &

客户端

安装完以后需要写生产者的消费者了,直接用最简单的方法来写。

Producer

package producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Properties props = new Properties();
    props.put("bootstrap.servers","122.20.109.68:9092");
    props.put("acks","1");
    props.put("retries","0");
    props.put("batch.size","16384");
// props.put("linger.ms","1");
// props.put("buffer.memory","33554432");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//生产者的建立
    KafkaProducer producer = new KafkaProducer<>(props);

    for (int i=0;i<100;i++) {
      System.out.println("seding message "+i);
      ProducerRecord record = new ProducerRecord("testTopic",String.valueOf(i),"this is message"+i);
      producer.send(record, new Callback() {
        public void onCompletion (RecordMetadata metadata, Exception e) {
          if (null != e) {
            e.printStackTrace();
          } else {
            System.out.println(metadata.offset());
          }
        }
      });
    }
    Thread.sleep(100000);
    producer.close();
  }
} 

这里有个坑,如果我直接用producer.send(ProducerRecord)方法,发完100条以后producer.close(),会导致Kafka无法收到消息,怀疑是异步发送导致的,需要真的发送到Kafka以后才能停止Producer,所以我在后面sleep了一下,加上以后就可以正常发送了。
使用callback是异步发送,此外还能使用同步发送,直接在send方法后加上一个get方法就会直接阻塞直到broker返回消息已收到。

producer.send(record).get();

Producer的properties有几个常用配置:

  • bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
  • acks:broker消息确认的模式,有三种:
    0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
    1:由Leader确认,Leader接收到消息后会立即返回确认信息
    all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
    我们可以根据消息的重要程度,设置不同的确认模式。默认为1
  • retries:发送失败时Producer端的重试次数,默认为0
  • batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都DuLi发送。默认为16384字节
  • linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
  • key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
  • buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)

Consumer

package consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class Consumer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers","122.20.109.68:9092");
    props.put("group.id","test");
    props.put("enable.auto.commit","true");
    props.put("auto.commit.interval.ms","1000");
    props.put("session.timeout.ms","30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("testTopic"));
    while(true) {
      ConsumerRecords records = consumer.poll(1000);
      for (ConsumerRecord record: records) {
        System.out.println("offset "+record.offset()+" Message: "+record.value());
      }
    }
  }
}

Consumer的Properties的常用配置有:

  • bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含义一样,不再赘述
  • fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer会等待消息积累到一定尺寸后进行批量拉取。默认为1,代表有一条就拉一条
  • max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M
  • group.id:Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
  • enable.auto.commit:是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
  • auto.commit.interval.ms:自动提交offset的间隔毫秒数,默认5000。

参考:http://www.cnblogs.com/edison2012/p/5774207.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容

  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,814评论 4 54
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,467评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,827评论 8 167
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,345评论 0 9