使用NodeJS连接Kafka(Kafka-node组件)

通常好产品可以理解为设计的原理很奇妙,能够健壮地执行,使用者实战也不用花太多精力学习。
Kafka是一款简单又实用的产品。知识的深入通常都是实战中遇到问题累积,沉绽后得到提升。好的产品设计的初衷通常是让使用者可以快速上手,马上投入运行。 至于是否真要深入,那就看使用者意愿了。 毕竟市面上成熟东西实在太多了,有足够的精力更应该多挖掘其他有前景的产品。
虽然这样说,在使用前Kafka该懂的原理还是必须得了解的。 尤其是Partitions分区(简单点,就是多条队列分支)。可谓不懂就不知道如何高性能应用。
读了网上很多的相关介绍,这一篇个人认为其图解是最通俗易懂的:漫画:图解 Kafka,看本篇就足够啦
建议没有掌握其原理的使用者可以先阅读了解一二。

使用docker-compose 部署Kafka

由于kafka运行依赖zookeeper,除了安装kafka外,还得先在系统上安装zookeeper。
经过网上的各种介绍和试错。最简单无脑的安装方式就是使用docker compose。
创建好docker-compose.yml 文件

version: '3.9'


services:
  zookeeper: 
    image: wurstmeister/zookeeper  ## 镜像
    container_name: zookeeper  ## 容器名称
    ports:
      - "2181:2181"   ## 对外暴露的端口号
    volumes:   
      - /etc/localtime:/etc/localtime  ## 挂载时区(kafka镜像和宿主机器之间时间保持一致)
  kafka:
    image: wurstmeister/kafka ## 镜像
    container_name: kafka ## 容器名称
    ports:
      - "9092:9092" ## 对外暴露的端口号
    environment: ## 环境变量
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 #按实际应该是写入宿主IP
      KAFKA_CREATE_TOPICS: "test:1:1"
    depends_on: ## 依赖情况
      - zookeeper

构建容器即可:

cd [当前存放docker-compose.yml的目录]
docker-compose up -d

成功后,查看容器信息

% docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED          STATUS          PORTS                                                                   NAMES
bb4a4fc60f84   wurstmeister/kafka       "start-kafka.sh"         58 minutes ago   Up 58 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                               kafka
b10413ec436e   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   59 minutes ago   Up 59 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   zookeeper

测试验证

进入容器

docker exec -it kafka bash

1、创建 一个topic
参数说明:
--create --topic test 指示了要创建topic名字叫 test
--zookeeper zookeeper:2181 指示了 zookeeper 服务的地址

$ kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1

2、查看 topic 详情

$ kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test Topic: test    TopicId: Pu_5qECuTR-Gfi7Br9qCvA PartitionCount: 1   ReplicationFactor: 1    Configs: 
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

3、获得topic的列表

$ kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
test

4、消息生产(即发送消息)

$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#可以发送消息
>hello
>hello world

5、消费消息(即指接收消息方)

$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
#可以开始接收到 producer.sh 发送的消息。

``

NodeJS连接Kafka(使用Kafka-node组件)

建立一个NodeJS项目(这里略过),下一步就是安装依赖

npm i kafka-node

第一个步骤就是连接kafka服务的代码

#引入组件
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})

第二步,可以通过代码的方式创建topic

#创建两个topic
#其中主题为topic-for-1 拥有1个分区数
#主为 topic-for-8 拥有4个分区数

function createTopics(){
        const topicsToCreate = [{
            topic: 'topic-for-1',
            partitions: 1,     //分区数量
            replicationFactor: 1        //副本数量
          },
          {
            topic: 'topic-for-8',
            partitions: 4,           //分区数量
            replicationFactor: 1,     //副本数量
            // Optional set of config entries
            configEntries: [
              {
                name: 'compression.type',
                value: 'gzip'
              },
              {
                name: 'min.compaction.lag.ms',
                value: '50'
              }
            ]            
          }];

          client.createTopics(topicsToCreate, (error, result) => {
                console.log(result);
                console.log(error);
          });
    }

第三步,先编写消费消息(即指接收消息方)的代码段
这个组件会提供两种的类(kafka.Consumer 和kafka.ConsumerGroup)给使用者使用。通常情况下,使用Consumer即可。 ConsumerGroup封装了kafka.KafkaClient和kafka.Consumer,并且提供了更多可控的参数。

function createConsumer(callback){
        //消费者
        let self=this;
        let consumer = new kafka.Consumer(this.client,
        [
             { topic: 'topic-for-1', partition: 0 ,groupId:"my_group"},
             { topic: 'topic-for-8', partition: 1 }

#注:监听两个topic,分别名为topic-for-1,topic-for-8。其中
#监听topic-for-1的0号分区,设置组名为 my_group。
#即如果还有机器加入,并且组名也为my_group时,则代表同属同一个组。
         ],{
            autoCommit:true    //为true,代表自动提交offset
        });
        consumer.on('message',function(message){
            console.log('consumer receive message:')
            console.log(message);    
            callback(message);       
        });

        consumer.on('error',function(err){
            console.log('consumer err:')
            console.log(err);
        });
        return consumer;
    }

第四步,可以建立另一个项目来作为构建消息生产,或者同一个项目下使用setTimeout之类函数做演示。
组件提供两种消息生产(即发送消息)的类:kafka.HighLevelProducer和kafka.Producer。用法是一致,其中kafka.HighLevelProducer为高可用消息生产。

function createHighLevelProducter(callback){
        const producer=new kafka.HighLevelProducer(client);
        producer.on('ready',function(){
             console.log('kafka producer is connected and ready');
             callback(producer);
        });
        producer.on('error',function(err){
             console.log('kafka producer is error');
        });
}

最后,封装一系列的函数后, 如果只在一个终端上,我们就可以编写一小段代码来做测试。

const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})

#创建topic
createTopics();

#启动消息消费

createConsumer((message)=>{
      console.log('event callback receive message:');
      console.log(message);
});

createHighLevelProducter((producter)=>{
  
    #对主题:topic-for-1的第1个分区发送消息,对主题:topic-for-8的第2个分区发送消息。
    setTimeout(()=>{
           const record={
            topic:'topic-for-1',
            message:"Hello topic1!",
            key:'topic1-1',
            attributes:1,
            partition:0
          },
          {
            topic:'topic-for-8',
            message:"Hello topic8。",
            key:'topic8-1',
            attributes:1,
            partition:1
          };
          producter.send(record, (err, data) => {
                if (err) {
                  console.log(`producer send err: ${err}`)
                }
                console.log(data);
            });              
    },500);
});

完成后可以观察具体的数据结果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容