1. 前言
本文为 Kafka 入门笔记,主要包括 Kafka 单节点部署、生产消费消息,以及新手踩坑记录。
Kafka 作为大数据必备组件、消息中间件必学的 Apache 顶级开源项目,服务稳定、高吞吐的流数据处理平台。具体介绍可查看文末参考文档。
目前 简书 暂不支持 markdown 收缩语句块,对于文章展示方面略有问题
1.1. 可查看美观版本
文档:Kafka 入门手记.md
链接:http://note.youdao.com/noteshare?id=d1c65daf3c137f29a860b5efd5dff944&sub=F4E6A5E5FB3946499E7C4C3E6022E1DC
2. Kafka单节点部署
2.1. 下载
选择合适版本即可,这里选择最新版本。Linux 环境。
2.2. 解压
Kafka 安装包 后缀为 .tgz
, 解压即可。
tar -zxvf kafka_package.tgz
其中,kafka_package.tgz
是 Kafka 安装包名称。
2.2.1. zookeeper 安装
Kafka 依赖于 zookeeper(ZK) 支持,本文直接采用 Kafka 安装包自带的 zookeeper。
也可以单独部署 zookeeper,使用方式一样。对于 生产环境,建议单独搭建 zookeeper。
2.3. 配置
2.3.1. zookeeper
进入 Kafka 解压包根目录,config
文件夹下的即为 Kafka 提供的默认配置文件。
此处我们修改下 zookeeper.properties
的 host
信息。此处修改 host
配置,主要是为了避免在使用 Java 客户端连接解析为 localhost
host.name=your_ip
advertised.host.name=your_ip
其中,your_ip
配置为服务器外网 IP。
2.3.2. Kafka
同样在 config
配置文件下修改 server.properties
文件即可。
主要配置如下:
# 监听
listeners=PLAINTEXT://服务器内网IP:9092
# 以下两项类似 zookeeper 配置
advertised.listeners=PLAINTEXT://服务器外网IP:9092
host.name=外网IP
# zookeeper 连接信息
zookeeper.connect=zookeeper服务器IP:2181
2.4. 启动
先启动 zookeeper,因为 Kafka 启动的时候会连接注册 zookeeper。
2.4.1. zookeeper
启动 zookeeper,在 Kafka 根目录执行
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
上述方式,会占有 shell 客户端窗口,如果想后台启动,添加参数 daemon
即可
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
2.4.2. kafka
类似 zookeeper 启动。
./bin/kafka-server-start.sh ./config/server.properties
后台启动:
./bin/kafka-server-start.sh -daemon ./config/server.properties
此时 Kafka 单节点部署就已经完成了,通过 ps -ef | grep zookeeper
, ps -ef | grep kafka
看到对应进程,证明启动成功。
2.5. topic
2.5.1. 创建 topic
通过 Kafka 提供的脚本文件,即可创建。在 Kafka 根目录下执行:
./config/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_name
1)指定 zk 为 localhost:2181
2)副本因子为 1,即不需要副本
3)partition 数量为 3
4)topic 名称为 top_name
2.5.2. 查看 topic
./config/kafka-topics.sh --list --zookeeper localhost:2181
- list 命令用于查看
2)需要指定 zk
2.6. 生产消费
此处直接通过 Kafka 提供的简单客户端进行生产消费数据。
2.6.1. 生产
1、启动简单 producer
./config/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name
1) --broker-list 指定 Kafka 的地址及端口
2)--topic 指定具体 topic_name
2、 生产消息
直接在 producer 窗口输入消息即可,消息是否发送成功,直接在 consumer 窗口即可查看。
2.6.2. 消费
1、启动简单 consumer
./config/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning
1)--bootstrap-server 指定 Kafka 地址及端口
2)-- topic 指定 topic
3)--from-beginning 表示指定从 offset 从头开始消费
2、消费数据
直接在 producer 窗口发送消息,然后切换至 consumer 窗口,查看是否成功消费消息。
3. Java Client 生产消费
此步基于 SpringBoot 进行搭建 demo 项目。 SpringBoot 版本为 2.x
3.1. 新建 SpringBoot 项目
直接通过 spring.starter 创建即可。完整项目: lambochen/demo/kafka
3.1.1. 引入依赖
Kafka 依赖:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.2. Kafka 配置
3.2.1. producer
1、application.properties
配置
kafka.producer.servers=kafka服务器IP:服务器端口号
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
kafka.topic.default=topic名称
2、ProducerFactory
, KafkaTemplate
配置:
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new LinkedHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG,linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return props;
}
public ProducerFactory<String, MessageEntity> producerFactory(){
return new DefaultKafkaProducerFactory<>(
producerConfigs(),
new StringSerializer(),
new JsonSerializer<MessageEntity>());
}
@Bean("kafkaTemplate")
public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
3.2.2. consumer
1、application.properties
配置
kafka.consumer.zookeeper.connect=ZK服务器端口:ZK端口
kafka.consumer.servers=Kafka服务器IP:Kafka端口
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=topic名称
kafka.consumer.group.id=consumerGroup名称
kafka.consumer.concurrency=10
2、KafkaListenerContainerFactory
配置
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
private ConsumerFactory<String, MessageEntity> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(MessageEntity.class)
);
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
3.2.3. ProducerCallBack
生产回调,主要用于生产者发送消息后的处理。此 demo 仅作日志记录。
需要集成 ListenableFutureCallback
,并指定消息实体类型。
public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>>
其中, MessageEntity
为消息实体类型。
3.3. 生产消息
创建 ProducerRecord
消息,通过 KafkaTemplate
发送即可。
@Autowired
@Qualifier("kafkaTemplate")
private KafkaTemplate<String, MessageEntity> kafkaTemplate;
public void send(String topic, MessageEntity message) {
kafkaTemplate.send(topic, message);
}
public void send(String topic, String key, MessageEntity message) {
ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(topic, key, message);
long startTime = System.currentTimeMillis();
ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
future.addCallback(new ProducerCallback(startTime, key, message));
}
3.4. 消费消息
消费消息,通过 @KafkaConsumer
注解即可实现。
@KafkaListener(topics = "${kafka.topic.default}", containerFactory = "kafkaListenerContainerFactory")
public void consumer(MessageEntity message){
log.info("consumer: " + gson.toJson(message));
}
3.5. 测试
启动 SpringBoot 项目,通过提供的 controller 进行请求生产消息。
查看日志,成功记录消息内容,即为生产、消费成功。
到此为止,Kafka demo 应用已完成啦
4. 踩坑记录
4.1. 打印日志
我在刚开始建好项目、配置 Kafka 后,启动项目失败,无日志输出,不好排查问题。
设置日志 level, application.properties
文件配置:
logging.level.root=debug
4.2. kafka 启动内存不足
kafka 启动 报错cannot allocate memory,即内存不足
4.3. java client 连接失败
按照本教程配置,已经避免了这个问题。
【kafka】Java连接出现Connection refused: no further information