【Kafka】Kafka入门手记

1. 前言

本文为 Kafka 入门笔记,主要包括 Kafka 单节点部署、生产消费消息,以及新手踩坑记录。

Kafka 作为大数据必备组件、消息中间件必学的 Apache 顶级开源项目,服务稳定、高吞吐的流数据处理平台。具体介绍可查看文末参考文档。

目前 简书 暂不支持 markdown 收缩语句块,对于文章展示方面略有问题

1.1. 可查看美观版本

文档:Kafka 入门手记.md
链接:http://note.youdao.com/noteshare?id=d1c65daf3c137f29a860b5efd5dff944&sub=F4E6A5E5FB3946499E7C4C3E6022E1DC

2. Kafka单节点部署

2.1. 下载

http://kafka.apache.org

选择合适版本即可,这里选择最新版本。Linux 环境。

2.2. 解压

Kafka 安装包 后缀为 .tgz , 解压即可。

tar -zxvf kafka_package.tgz

其中,kafka_package.tgz 是 Kafka 安装包名称。

2.2.1. zookeeper 安装

Kafka 依赖于 zookeeper(ZK) 支持,本文直接采用 Kafka 安装包自带的 zookeeper。

也可以单独部署 zookeeper,使用方式一样。对于 生产环境,建议单独搭建 zookeeper。

2.3. 配置

2.3.1. zookeeper

进入 Kafka 解压包根目录,config 文件夹下的即为 Kafka 提供的默认配置文件。

此处我们修改下 zookeeper.propertieshost 信息。此处修改 host 配置,主要是为了避免在使用 Java 客户端连接解析为 localhost

host.name=your_ip
advertised.host.name=your_ip

其中,your_ip 配置为服务器外网 IP。

2.3.2. Kafka

同样在 config 配置文件下修改 server.properties 文件即可。

主要配置如下:

# 监听
listeners=PLAINTEXT://服务器内网IP:9092

# 以下两项类似 zookeeper 配置
advertised.listeners=PLAINTEXT://服务器外网IP:9092
host.name=外网IP

# zookeeper 连接信息
zookeeper.connect=zookeeper服务器IP:2181

2.4. 启动

先启动 zookeeper,因为 Kafka 启动的时候会连接注册 zookeeper。

2.4.1. zookeeper

启动 zookeeper,在 Kafka 根目录执行

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

上述方式,会占有 shell 客户端窗口,如果想后台启动,添加参数 daemon 即可

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

2.4.2. kafka

类似 zookeeper 启动。

./bin/kafka-server-start.sh ./config/server.properties

后台启动:

./bin/kafka-server-start.sh -daemon ./config/server.properties

此时 Kafka 单节点部署就已经完成了,通过 ps -ef | grep zookeeper, ps -ef | grep kafka 看到对应进程,证明启动成功。

2.5. topic

2.5.1. 创建 topic

通过 Kafka 提供的脚本文件,即可创建。在 Kafka 根目录下执行:

./config/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_name

1)指定 zk 为 localhost:2181
2)副本因子为 1,即不需要副本
3)partition 数量为 3
4)topic 名称为 top_name

2.5.2. 查看 topic

./config/kafka-topics.sh --list --zookeeper localhost:2181
  1. list 命令用于查看
    2)需要指定 zk

2.6. 生产消费

此处直接通过 Kafka 提供的简单客户端进行生产消费数据。

2.6.1. 生产

1、启动简单 producer

./config/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name

1) --broker-list 指定 Kafka 的地址及端口
2)--topic 指定具体 topic_name

2、 生产消息

直接在 producer 窗口输入消息即可,消息是否发送成功,直接在 consumer 窗口即可查看。

2.6.2. 消费

1、启动简单 consumer

./config/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning

1)--bootstrap-server 指定 Kafka 地址及端口
2)-- topic 指定 topic
3)--from-beginning 表示指定从 offset 从头开始消费

2、消费数据

直接在 producer 窗口发送消息,然后切换至 consumer 窗口,查看是否成功消费消息。

3. Java Client 生产消费

此步基于 SpringBoot 进行搭建 demo 项目。 SpringBoot 版本为 2.x

3.1. 新建 SpringBoot 项目

直接通过 spring.starter 创建即可。完整项目: lambochen/demo/kafka

3.1.1. 引入依赖

Kafka 依赖:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.2. Kafka 配置

3.2.1. producer

1、application.properties 配置

kafka.producer.servers=kafka服务器IP:服务器端口号
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

kafka.topic.default=topic名称

2、ProducerFactory, KafkaTemplate 配置:

public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new LinkedHashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG,linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    return props;
}

public ProducerFactory<String, MessageEntity> producerFactory(){
    return new DefaultKafkaProducerFactory<>(
            producerConfigs(),
            new StringSerializer(),
            new JsonSerializer<MessageEntity>());
}

@Bean("kafkaTemplate")
public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

3.2.2. consumer

1、application.properties 配置

kafka.consumer.zookeeper.connect=ZK服务器端口:ZK端口
kafka.consumer.servers=Kafka服务器IP:Kafka端口
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=topic名称
kafka.consumer.group.id=consumerGroup名称
kafka.consumer.concurrency=10

2、KafkaListenerContainerFactory 配置

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>>
kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
}

private ConsumerFactory<String, MessageEntity> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
            consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(MessageEntity.class)
    );
}

private Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    return propsMap;
}

3.2.3. ProducerCallBack

生产回调,主要用于生产者发送消息后的处理。此 demo 仅作日志记录。

需要集成 ListenableFutureCallback ,并指定消息实体类型。

public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> 

其中, MessageEntity 为消息实体类型。

3.3. 生产消息

创建 ProducerRecord 消息,通过 KafkaTemplate 发送即可。

@Autowired
@Qualifier("kafkaTemplate")
private KafkaTemplate<String, MessageEntity> kafkaTemplate;

public void send(String topic, MessageEntity message) {
    kafkaTemplate.send(topic, message);
}

public void send(String topic, String key, MessageEntity message) {
    ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(topic, key, message);
    long startTime = System.currentTimeMillis();
    ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
    future.addCallback(new ProducerCallback(startTime, key, message));

}

3.4. 消费消息

消费消息,通过 @KafkaConsumer 注解即可实现。

@KafkaListener(topics = "${kafka.topic.default}", containerFactory = "kafkaListenerContainerFactory")
public void consumer(MessageEntity message){
    log.info("consumer: " + gson.toJson(message));
}

3.5. 测试

启动 SpringBoot 项目,通过提供的 controller 进行请求生产消息。

查看日志,成功记录消息内容,即为生产、消费成功。

到此为止,Kafka demo 应用已完成啦

4. 踩坑记录

4.1. 打印日志

我在刚开始建好项目、配置 Kafka 后,启动项目失败,无日志输出,不好排查问题。

设置日志 level, application.properties 文件配置:

logging.level.root=debug

4.2. kafka 启动内存不足

kafka 启动 报错cannot allocate memory,即内存不足

4.3. java client 连接失败

按照本教程配置,已经避免了这个问题。

【kafka】Java连接出现Connection refused: no further information

5. 参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,580评论 0 10
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,345评论 0 9
  • Apache Kafka 入门 1.kafka简介和产生的背景 什么是 Kafka Kafka 是一款分布式消息发...
    阿粒_lxf阅读 1,786评论 0 0
  • 以下内容部分翻译至 http://kafka.apache.org/intro kafka介绍 我们认为,一个流处...
    若与阅读 8,758评论 0 22
  • 2017年的最后一个夜晚,在王小波的《黄金时代》里悄悄的穿行。 走走停停,反反复复。无数个真实而又虚幻的画面映入脑...
    悄然Edward阅读 194评论 0 1