昨天在测试环境搭建了一套zookeeper+kafka(各一台)的机器,开始进行kafka的实践之旅。昨天下班前一直都出现无法发送无法接收的问题,今天终于搞定了。
zookeeper的安装
直接从官网下载bin包后,解压即可
tar -zxvf zookeeper-3.4.9.tar.gz
需要修改的配置有:
- 把conf目录下的zoo_sample.cfg改名为zoo.cfg(并修改dataDir)
- 修改bin目录下的zkEnv.sh脚本中的ZOO_LOG_DIR和ZOO_LOG4J_PROP
启动zookeeper
bin/zkServer.sh start
Kafka的安装
由于只使用了一个broker,所以直接解压包
tar -zxvf kafka_2.11-0.10.2.0.tgz
需要修改的配置为config/server.properties文件,主要修改的有log.dirs和listeners。
listeners=PLAINTEXT://localhost:9092
这里有个坑,server.properties中一定要配置host.name或者listeners,不然会出现无法收发消息的现象
然后启动即可
bin/kafka-server-start.sh config/server.properties &
客户端
安装完以后需要写生产者的消费者了,直接用最简单的方法来写。
Producer
package producer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put("bootstrap.servers","122.20.109.68:9092");
props.put("acks","1");
props.put("retries","0");
props.put("batch.size","16384");
// props.put("linger.ms","1");
// props.put("buffer.memory","33554432");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//生产者的建立
KafkaProducer producer = new KafkaProducer<>(props);
for (int i=0;i<100;i++) {
System.out.println("seding message "+i);
ProducerRecord record = new ProducerRecord("testTopic",String.valueOf(i),"this is message"+i);
producer.send(record, new Callback() {
public void onCompletion (RecordMetadata metadata, Exception e) {
if (null != e) {
e.printStackTrace();
} else {
System.out.println(metadata.offset());
}
}
});
}
Thread.sleep(100000);
producer.close();
}
}
这里有个坑,如果我直接用producer.send(ProducerRecord)方法,发完100条以后producer.close(),会导致Kafka无法收到消息,怀疑是异步发送导致的,需要真的发送到Kafka以后才能停止Producer,所以我在后面sleep了一下,加上以后就可以正常发送了。
使用callback是异步发送,此外还能使用同步发送,直接在send方法后加上一个get方法就会直接阻塞直到broker返回消息已收到。
producer.send(record).get();
Producer的properties有几个常用配置:
- bootstrap.servers:Kafka集群连接串,可以由多个host:port组成
- acks:broker消息确认的模式,有三种:
0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
1:由Leader确认,Leader接收到消息后会立即返回确认信息
all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
我们可以根据消息的重要程度,设置不同的确认模式。默认为1 - retries:发送失败时Producer端的重试次数,默认为0
- batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都DuLi发送。默认为16384字节
- linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
- key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定
- buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)
Consumer
package consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers","122.20.109.68:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("testTopic"));
while(true) {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record: records) {
System.out.println("offset "+record.offset()+" Message: "+record.value());
}
}
}
}
Consumer的Properties的常用配置有:
- bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含义一样,不再赘述
- fetch.min.bytes:每次最小拉取的消息大小(byte)。Consumer会等待消息积累到一定尺寸后进行批量拉取。默认为1,代表有一条就拉一条
- max.partition.fetch.bytes:每次从单个分区中拉取的消息最大尺寸(byte),默认为1M
- group.id:Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
- enable.auto.commit:是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
- auto.commit.interval.ms:自动提交offset的间隔毫秒数,默认5000。