实践
创建项目
$ $ mvn archetype:generate -DgroupId=com.kafkademo -DartifactId=myproducer -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
添加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
Producer 代码:
package com.kafkademo;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "server191:9092,server192:9092,server195:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
Producer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i));
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("发送成功");
} else {
System.out.println("发送失败 " + exception.getMessage());
}
}
});
}
producer.close();
}
}
说明
构建 Producer 的步骤
- 构造 properties 对象,设置好相关参数。
bootstrap.servers 是必须设置的,指定 kafka broker 连接信息,如果 broker 很多,也不需要全部列出来,列出一部分即可,因为不管列几个,都可以根据列出的找到所有的,多列几个的目的是故障转移,如果只列一个,那万一这个挂了就连不上了。
key.serializer 和 value.serializer 也是必须设置的,用来对 key 和 value 进行序列化,因为发送到broker的任何消息的格式都必须是字节数组。
- 使用 properties 对象构造 KafkaProducer 对象。
- 构造消息对象 ProducerRecord,指定目标 topic、分区、key、value,分区和key不是必须的。
- 调用 KafkaProducer 的 send 方法发送消息。
发送方式有异步和同步,上面的示例代码中使用了异步方式,同步方式的用法:
...
producer.send(record).get();
···
同步方式其实也是异步的,send 方法返回 Future 对象,Future.get 会一直等待,直至等到返回结果,如果出错,会抛出异常。
- 关闭 KafkaProducer。