本文介绍kafka生产者多种实现方式,
方式1:
编写一个Produce类继承kafkaProduce:
public class Producer extends KafkaProducer<String, String> {
public Producer(Properties properties) {
super(properties);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record) {
return super.send(record);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
return super.send(record, callback);
}
}
编写ProduceUtils工具类:
public class ProduceUtils {
private static ProduceUtils instance;
private static Producer producer;
//获取实例
public static synchronized ProduceUtils getInstance() {
if (instance == null) {
instance = new ProduceUtils();
System.out.println("初始化 kafka producer...");
}
return instance;
}
/**
* 初始化
*/
public void init() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertyUtil.getInstance().getValueByKey("brokerList"));
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "rawMessage");// 自定义客户端id
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// key
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.knowyou.tt.NewPartitioner");//自定义分区函数
producer = new Producer(properties);
System.out.println("loading the properities......");
}
/**
* 发送主题和数据
*
* @param topic
* @param value
*/
public void send(String topic, String value) {
try {
producer.send(new ProducerRecord<String, String>(topic, value));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送主题,数据,和数据
*
* @param topic
* @param key
* @param value
*/
public void send(String topic, String key, String value) {
try {
producer.send(new ProducerRecord<String, String>(topic, value));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @param topic
* @param key
* @param value
*/
public void sendCallBack(String topic, String key, String value) {
try {
producer.send(new ProducerRecord<String, String>(topic, value), new ProduceCallback());
} catch (Exception e) {
e.printStackTrace();
}
}
编写回调函数Callback:
public class ProduceCallback implements Callback {
private static final Logger log = LoggerFactory.getLogger(ProduceCallback.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//send success
if (null == exception) {
log.info("send message success");
return;
}
//send failed
log.error("send message failed");
}
}
调用ProduceUtils即可以实现生产功能。
方式2:
编写一个生产者的工厂类:
public class ProduceFactory {
private static final Logger logger = Logger.getLogger(ProduceFactory.class);
private static KafkaProducer kafkaProducer = null;
private static ProduceFactory produceFactory = null;
private ProduceFactory() {
}
/**
* 单例
*
* @return
*/
public static ProduceFactory getInstance() {
if (produceFactory == null) {
synchronized (ProduceFactory.class) {
if (produceFactory == null) {
produceFactory = new ProduceFactory();
}
}
}
return produceFactory;
}
/**
* 初始化kafka生产者
*
* @throws Exception
*/
public void init() throws Exception {
try {
Properties properties = new Properties();
InputStream stream = this.getClass().getClassLoader().getResourceAsStream("kafka.properties");
properties.load(stream);
kafkaProducer = new KafkaProducer(properties);
} catch (Exception e) {
logger.error("kafka produce init error:" + e.getMessage());
}
}
/**
* @param topic
* @param key
* @param value
*/
public void send(String topic, String key, String value) {
ProducerRecord record;
try {
record = new ProducerRecord<>(topic, key, value);
kafkaProducer.send(record, new SendCallback(record, 0));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @param topic
* @param value
*/
public void send(String topic, String value) {
ProducerRecord record;
try {
record = new ProducerRecord<>(topic, value);
kafkaProducer.send(record, new SendCallback(record, 0));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* producer回调
*/
static class SendCallback implements Callback {
ProducerRecord<String, String> record;
int sendSeq = 0;
public SendCallback(ProducerRecord record, int sendSeq) {
this.record = record;
this.sendSeq = sendSeq;
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//send success
if (null == e) {
String meta = "topic:" + recordMetadata.topic() + ", partition:"
+ recordMetadata.topic() + ", offset:" + recordMetadata.offset();
logger.info("send message success, record:" + record.toString() + ", meta:" + meta);
return;
}
//send failed
logger.error("send message failed, seq:" + sendSeq + ", record:" + record.toString() + ", errmsg:" + e.getMessage());
if (sendSeq < 1) {
kafkaProducer.send(record, new SendCallback(record, ++sendSeq));
}
}
}
3,采用spring-kafka的模式生产数据
先做一个工厂类:
public class ProduceFactory {
public static KafkaTemplate<String, String> getKafkaTemplate() {
return new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory(getConfigs()), true);
}
private static Map<String, Object> getConfigs() {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:6667,192.168.1.11:6667,192.168.1.12:6667");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
return properties;
}
}
再做一个工具类:
public class KafkaTool {
private static KafkaTemplate<String, String> template = ProduceFactory.getKafkaTemplate();
public static void sendData(final String topic, final String line) {
template.execute(new KafkaOperations.ProducerCallback() {
public Object doInKafka(Producer producer) {
producer.send(new ProducerRecord<String, String>(topic, line), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
System.out.println("fail to send kafka");
}
}
});
return null;
}
});
}
}