= Kafka环境搭建
## docker 安装 镜像
```
1、docker pull wurstmeister/zookeeper
2、docker pull wurstmeister/kafka
```
## docker 启动zookeeper
``` docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper ```
## docker 启动kafka
``` docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka ```
= Producer \ Consumer 相关配置参考文档
Producer配置参数:: https://www.orchome.com/511
Consumer配置参数::https://www.orchome.com/535
= Producer Demo代码
```
public static void main(String[] args) {
for (int f = 0; f < 10; f++) {
final int index = f;
new Thread(() -> {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
String topic = "HelloWorld";
producer = new KafkaProducer<String, String>(properties);
for (int i = 3000; i < 100000; i++) {
String key = RandomUtils.nextInt(0, i) + "";
String msg = "Message " + key;
producer.send(new ProducerRecord<String, String>(topic, key, msg));
System.out.println("Send: "+index+" " + key + "==>" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}).start();
;
}
}
```
= Consumer Demo代码
```
public static void main(String[] args) {
List<String> topicList = Lists.newArrayList();
topicList.add("HelloWorld");
// topicList.add("hellow_stream_out");
// topicList.add("hellow_stream_out_2");
for (String topic : topicList) {
new Thread(startConsumer(topic)).start();
}
}
public static Runnable startConsumer(final String topic) {
return new Runnable() {
@Override
public void run() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
long index = 0;
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
long time = System.currentTimeMillis();
System.out.println(" start time:" + time);
for (ConsumerRecord<String, String> record : records) {
if (index == 0) {
index = record.offset();
} else if (record.offset() == (index + 1)) {
index = record.offset();
} else {
System.out.println(" 不连续的消费 " + record.offset() + "==>" + index);
int d = 1 / 0;
}
System.out .println("topic = " + record.topic() + ", "
+ "partition = " + record.partition() + ","
+ "offset = " + record.offset() + ","
+ " key = " + record.key() + ","
+ " value = " + record.value());
}
System.out.println(" batch execute time:" + (System.currentTimeMillis() - time) + " size:" + records.count());
}
}
}
;
}
```
= 基于Kafka的简单聊天功能
```
public static void main(String[] args) {
String topic = "message_demo";
String serverHost = "127.0.0.1:9092";
Scanner scanner = new Scanner(System.in);
System.out.print("请输入姓名:");
String name = scanner.nextLine();
System.out.println("你姓名是:" + name);
// 开启消费者
createConsumer(serverHost, topic);
// 开启生产者
createProducer(scanner,serverHost,topic,name);
}
public static void createProducer(Scanner scanner, String serverHost,String topic, String name){
Properties properties = new Properties();
properties.put("bootstrap.servers", serverHost);
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
while (true) {
String msg = scanner.nextLine();
if (StringUtils.isBlank(msg)) {
continue;
}
String key = name+"";
producer.send(new ProducerRecord<String, String>(topic, key, msg));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void createConsumer(final String serverHost,final String topic){
new Thread(()->{
Properties properties = new Properties();
properties.put("bootstrap.servers", serverHost);
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("name= "+record.key()+", msg:" + record.value());
}
}
}).start();
}
```