这一篇主要是介绍如何写kafka的prodecer代码
照着我人生导师的代码照猫画虎来的~
public class TestProducer extends Thread{
private final KafkaProducer<String, String> producer;
private final Boolean isAsync;
public TestProducer(String topic,Boolean isAsync) {
String[] str = topic.split(",");
Properties properties = new Properties();
properties.put("bootstrap.servers", "host:port");
properties.put("client.id","testProducer");
properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(properties);
this.isAsync = isAsync;
}
public void run() {
File file = new File("/path/to/file"); // 要分词的文件所在的位置
BufferedReader reader = null;
try{
reader = new BufferedReader(new FileReader(file));
String tempString = null;
int line=1;
while((tempString = reader.readLine())!= null){
producer.send(new ProducerRecord<String, String>("topic50",line+"---"+tempString));//topic注意改成你自己的topic
System.out.println("Success send [" +line+ "] message..");
line++;
}
reader.close();
System.out.println("Total send [" +line+ "] message..");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if(reader !=null)
{
try{
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
producer.close();
}
}
/**
*main函数
**/
public class Main {
public static void main(String[] args) {
TestProducer test = new TestProducer("topic50", false);
test.start();
}
}