通常好产品可以理解为设计的原理很奇妙,能够健壮地执行,使用者实战也不用花太多精力学习。
Kafka是一款简单又实用的产品。知识的深入通常都是实战中遇到问题累积,沉绽后得到提升。好的产品设计的初衷通常是让使用者可以快速上手,马上投入运行。 至于是否真要深入,那就看使用者意愿了。 毕竟市面上成熟东西实在太多了,有足够的精力更应该多挖掘其他有前景的产品。
虽然这样说,在使用前Kafka该懂的原理还是必须得了解的。 尤其是Partitions分区(简单点,就是多条队列分支)。可谓不懂就不知道如何高性能应用。
读了网上很多的相关介绍,这一篇个人认为其图解是最通俗易懂的:漫画:图解 Kafka,看本篇就足够啦。
建议没有掌握其原理的使用者可以先阅读了解一二。
使用docker-compose 部署Kafka
由于kafka运行依赖zookeeper,除了安装kafka外,还得先在系统上安装zookeeper。
经过网上的各种介绍和试错。最简单无脑的安装方式就是使用docker compose。
创建好docker-compose.yml 文件
version: '3.9'
services:
zookeeper:
image: wurstmeister/zookeeper ## 镜像
container_name: zookeeper ## 容器名称
ports:
- "2181:2181" ## 对外暴露的端口号
volumes:
- /etc/localtime:/etc/localtime ## 挂载时区(kafka镜像和宿主机器之间时间保持一致)
kafka:
image: wurstmeister/kafka ## 镜像
container_name: kafka ## 容器名称
ports:
- "9092:9092" ## 对外暴露的端口号
environment: ## 环境变量
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 #按实际应该是写入宿主IP
KAFKA_CREATE_TOPICS: "test:1:1"
depends_on: ## 依赖情况
- zookeeper
构建容器即可:
cd [当前存放docker-compose.yml的目录]
docker-compose up -d
成功后,查看容器信息
% docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
bb4a4fc60f84 wurstmeister/kafka "start-kafka.sh" 58 minutes ago Up 58 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
b10413ec436e wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 59 minutes ago Up 59 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp zookeeper
测试验证
进入容器
docker exec -it kafka bash
1、创建 一个topic
参数说明:
--create --topic test 指示了要创建topic名字叫 test
--zookeeper zookeeper:2181 指示了 zookeeper 服务的地址
$ kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1
2、查看 topic 详情
$ kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test Topic: test TopicId: Pu_5qECuTR-Gfi7Br9qCvA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
3、获得topic的列表
$ kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
test
4、消息生产(即发送消息)
$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#可以发送消息
>hello
>hello world
5、消费消息(即指接收消息方)
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
#可以开始接收到 producer.sh 发送的消息。
``
NodeJS连接Kafka(使用Kafka-node组件)
建立一个NodeJS项目(这里略过),下一步就是安装依赖
npm i kafka-node
第一个步骤就是连接kafka服务的代码
#引入组件
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})
第二步,可以通过代码的方式创建topic
#创建两个topic
#其中主题为topic-for-1 拥有1个分区数
#主为 topic-for-8 拥有4个分区数
function createTopics(){
const topicsToCreate = [{
topic: 'topic-for-1',
partitions: 1, //分区数量
replicationFactor: 1 //副本数量
},
{
topic: 'topic-for-8',
partitions: 4, //分区数量
replicationFactor: 1, //副本数量
// Optional set of config entries
configEntries: [
{
name: 'compression.type',
value: 'gzip'
},
{
name: 'min.compaction.lag.ms',
value: '50'
}
]
}];
client.createTopics(topicsToCreate, (error, result) => {
console.log(result);
console.log(error);
});
}
第三步,先编写消费消息(即指接收消息方)的代码段
这个组件会提供两种的类(kafka.Consumer 和kafka.ConsumerGroup)给使用者使用。通常情况下,使用Consumer即可。 ConsumerGroup封装了kafka.KafkaClient和kafka.Consumer,并且提供了更多可控的参数。
function createConsumer(callback){
//消费者
let self=this;
let consumer = new kafka.Consumer(this.client,
[
{ topic: 'topic-for-1', partition: 0 ,groupId:"my_group"},
{ topic: 'topic-for-8', partition: 1 }
#注:监听两个topic,分别名为topic-for-1,topic-for-8。其中
#监听topic-for-1的0号分区,设置组名为 my_group。
#即如果还有机器加入,并且组名也为my_group时,则代表同属同一个组。
],{
autoCommit:true //为true,代表自动提交offset
});
consumer.on('message',function(message){
console.log('consumer receive message:')
console.log(message);
callback(message);
});
consumer.on('error',function(err){
console.log('consumer err:')
console.log(err);
});
return consumer;
}
第四步,可以建立另一个项目来作为构建消息生产,或者同一个项目下使用setTimeout之类函数做演示。
组件提供两种消息生产(即发送消息)的类:kafka.HighLevelProducer和kafka.Producer。用法是一致,其中kafka.HighLevelProducer为高可用消息生产。
function createHighLevelProducter(callback){
const producer=new kafka.HighLevelProducer(client);
producer.on('ready',function(){
console.log('kafka producer is connected and ready');
callback(producer);
});
producer.on('error',function(err){
console.log('kafka producer is error');
});
}
最后,封装一系列的函数后, 如果只在一个终端上,我们就可以编写一小段代码来做测试。
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})
#创建topic
createTopics();
#启动消息消费
createConsumer((message)=>{
console.log('event callback receive message:');
console.log(message);
});
createHighLevelProducter((producter)=>{
#对主题:topic-for-1的第1个分区发送消息,对主题:topic-for-8的第2个分区发送消息。
setTimeout(()=>{
const record={
topic:'topic-for-1',
message:"Hello topic1!",
key:'topic1-1',
attributes:1,
partition:0
},
{
topic:'topic-for-8',
message:"Hello topic8。",
key:'topic8-1',
attributes:1,
partition:1
};
producter.send(record, (err, data) => {
if (err) {
console.log(`producer send err: ${err}`)
}
console.log(data);
});
},500);
});
完成后可以观察具体的数据结果。