#1. 安装kafka前需要安装zookeeper,kafka是需要zookeeper管理的
docker pull wurstmeister/zookeeper
提示是否安装最后版本,Y
#2. 启动zookeeper容器
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#3.安装kafka
docker pull wurstmeister/kafka
#4.启动kafka
docker run -d --name kafka -p 9092:9092 --env KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ZOOKEEPER_CONNECT=218.25.54.37:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://218.25.54.37:9092 -e KAFKA_LISTENERS=PLAINTEXT://218.25.54.37:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" --net=host wurstmeister/kafka
#5 进入到容器中
docker exec -it kafka bash
#6 进入bin文件夹
#7 创建topic 生产者
./kafka-topics.sh --create --zookeeper 218.25.54.37:2181 --replication-factor 1 --partitions 8 --topic test
./kafka-console-producer.sh --broker-list 218.25.54.37:9092 --topic test
#8 打开新的窗口,进入容器,进入bin文件夹 创建消费者
./kafka-console-consumer.sh --bootstrap-server 218.25.54.37:9092 --topic test --from-beginning
在生产者窗口输入消息,在消费者窗口就可以查看了。
生产者截图:
消费者截图:
kafka集成到spring boot
#1 pom 文件
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class KafkaReceiver {
private final org.slf4j.Loggerlog = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = {"linziheng"})
public void listen(ConsumerRecord record) {
Optional kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
测试接口:
@Autowired
private KafkaTemplatekafkaTemplate;
private Gsongson =new GsonBuilder().create();
@ApiOperation("kafka测试接口")
@PostMapping("/testKafka")
public void getRsJournalByUserId() {
Message message =new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
kafkaTemplate.send("linziheng",gson.toJson(message));
}
Message 类
@Data
public class Message {
private Longid;//id
private Stringmsg;//消息
private DatesendTime;//时间戳
}
请求接口后,控制台打印信息