使用NodeJS连接Kafka(Kafka-node组件)

通常好产品可以理解为设计的原理很奇妙,能够健壮地执行,使用者实战也不用花太多精力学习。
Kafka是一款简单又实用的产品。知识的深入通常都是实战中遇到问题累积,沉绽后得到提升。好的产品设计的初衷通常是让使用者可以快速上手,马上投入运行。 至于是否真要深入,那就看使用者意愿了。 毕竟市面上成熟东西实在太多了,有足够的精力更应该多挖掘其他有前景的产品。
虽然这样说,在使用前Kafka该懂的原理还是必须得了解的。 尤其是Partitions分区(简单点,就是多条队列分支)。可谓不懂就不知道如何高性能应用。
读了网上很多的相关介绍,这一篇个人认为其图解是最通俗易懂的:漫画:图解 Kafka,看本篇就足够啦
建议没有掌握其原理的使用者可以先阅读了解一二。

使用docker-compose 部署Kafka

由于kafka运行依赖zookeeper,除了安装kafka外,还得先在系统上安装zookeeper。
经过网上的各种介绍和试错。最简单无脑的安装方式就是使用docker compose。
创建好docker-compose.yml 文件

version: '3.9'


services:
  zookeeper: 
    image: wurstmeister/zookeeper  ## 镜像
    container_name: zookeeper  ## 容器名称
    ports:
      - "2181:2181"   ## 对外暴露的端口号
    volumes:   
      - /etc/localtime:/etc/localtime  ## 挂载时区(kafka镜像和宿主机器之间时间保持一致)
  kafka:
    image: wurstmeister/kafka ## 镜像
    container_name: kafka ## 容器名称
    ports:
      - "9092:9092" ## 对外暴露的端口号
    environment: ## 环境变量
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 #按实际应该是写入宿主IP
      KAFKA_CREATE_TOPICS: "test:1:1"
    depends_on: ## 依赖情况
      - zookeeper

构建容器即可:

cd [当前存放docker-compose.yml的目录]
docker-compose up -d

成功后,查看容器信息

% docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED          STATUS          PORTS                                                                   NAMES
bb4a4fc60f84   wurstmeister/kafka       "start-kafka.sh"         58 minutes ago   Up 58 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                               kafka
b10413ec436e   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   59 minutes ago   Up 59 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   zookeeper

测试验证

进入容器

docker exec -it kafka bash

1、创建 一个topic
参数说明:
--create --topic test 指示了要创建topic名字叫 test
--zookeeper zookeeper:2181 指示了 zookeeper 服务的地址

$ kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1

2、查看 topic 详情

$ kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test Topic: test    TopicId: Pu_5qECuTR-Gfi7Br9qCvA PartitionCount: 1   ReplicationFactor: 1    Configs: 
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

3、获得topic的列表

$ kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
test

4、消息生产(即发送消息)

$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#可以发送消息
>hello
>hello world

5、消费消息(即指接收消息方)

$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
#可以开始接收到 producer.sh 发送的消息。

``

NodeJS连接Kafka(使用Kafka-node组件)

建立一个NodeJS项目(这里略过),下一步就是安装依赖

npm i kafka-node

第一个步骤就是连接kafka服务的代码

#引入组件
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})

第二步,可以通过代码的方式创建topic

#创建两个topic
#其中主题为topic-for-1 拥有1个分区数
#主为 topic-for-8 拥有4个分区数

function createTopics(){
        const topicsToCreate = [{
            topic: 'topic-for-1',
            partitions: 1,     //分区数量
            replicationFactor: 1        //副本数量
          },
          {
            topic: 'topic-for-8',
            partitions: 4,           //分区数量
            replicationFactor: 1,     //副本数量
            // Optional set of config entries
            configEntries: [
              {
                name: 'compression.type',
                value: 'gzip'
              },
              {
                name: 'min.compaction.lag.ms',
                value: '50'
              }
            ]            
          }];

          client.createTopics(topicsToCreate, (error, result) => {
                console.log(result);
                console.log(error);
          });
    }

第三步,先编写消费消息(即指接收消息方)的代码段
这个组件会提供两种的类(kafka.Consumer 和kafka.ConsumerGroup)给使用者使用。通常情况下,使用Consumer即可。 ConsumerGroup封装了kafka.KafkaClient和kafka.Consumer,并且提供了更多可控的参数。

function createConsumer(callback){
        //消费者
        let self=this;
        let consumer = new kafka.Consumer(this.client,
        [
             { topic: 'topic-for-1', partition: 0 ,groupId:"my_group"},
             { topic: 'topic-for-8', partition: 1 }

#注:监听两个topic,分别名为topic-for-1,topic-for-8。其中
#监听topic-for-1的0号分区,设置组名为 my_group。
#即如果还有机器加入,并且组名也为my_group时,则代表同属同一个组。
         ],{
            autoCommit:true    //为true,代表自动提交offset
        });
        consumer.on('message',function(message){
            console.log('consumer receive message:')
            console.log(message);    
            callback(message);       
        });

        consumer.on('error',function(err){
            console.log('consumer err:')
            console.log(err);
        });
        return consumer;
    }

第四步,可以建立另一个项目来作为构建消息生产,或者同一个项目下使用setTimeout之类函数做演示。
组件提供两种消息生产(即发送消息)的类:kafka.HighLevelProducer和kafka.Producer。用法是一致,其中kafka.HighLevelProducer为高可用消息生产。

function createHighLevelProducter(callback){
        const producer=new kafka.HighLevelProducer(client);
        producer.on('ready',function(){
             console.log('kafka producer is connected and ready');
             callback(producer);
        });
        producer.on('error',function(err){
             console.log('kafka producer is error');
        });
}

最后,封装一系列的函数后, 如果只在一个终端上,我们就可以编写一小段代码来做测试。

const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})

#创建topic
createTopics();

#启动消息消费

createConsumer((message)=>{
      console.log('event callback receive message:');
      console.log(message);
});

createHighLevelProducter((producter)=>{
  
    #对主题:topic-for-1的第1个分区发送消息,对主题:topic-for-8的第2个分区发送消息。
    setTimeout(()=>{
           const record={
            topic:'topic-for-1',
            message:"Hello topic1!",
            key:'topic1-1',
            attributes:1,
            partition:0
          },
          {
            topic:'topic-for-8',
            message:"Hello topic8。",
            key:'topic8-1',
            attributes:1,
            partition:1
          };
          producter.send(record, (err, data) => {
                if (err) {
                  console.log(`producer send err: ${err}`)
                }
                console.log(data);
            });              
    },500);
});

完成后可以观察具体的数据结果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容