go-zero(十二)基于 Kafka:实现消息队列

go zero 基于 Kafka:实现消息队列

在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。

go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。

一、kafka简单介绍

Kafka 是一个开源的分布式流处理平台,主要用于构建实时数据管道和流应用。

1.Kafka 的架构

Kafka 的架构通常由以下几个部分构成:

  • Broker(节点):Kafka 集群由多个 broker 实例组成,负责管理消息的存储和处理。
  • Topic(主题):消息以主题的形式组织,每个主题可以有多个分区(partition),以支持高并发和扩展性。
  • Produce(生产者):消息生产者将数据发送到特定的topic中。
  • Consumer(消费者):消费者从topic中读取数据,可以将多个消费者分组以进行负载均衡。

2.Kafka 的关键特性

  1. 高吞吐量
    Kafka 设计上能够处理大量的实时数据流,具备非常高的吞吐量。这使得它能够轻松应对大规模数据流量,适合做日志聚合、监控数据处理等。

  2. 持久性
    Kafka 将消息持久化到磁盘,并提供复制功能,以确保数据的安全性和可靠性。即使在节点出现故障的情况下,也能保证数据不会丢失。

  3. 可扩展性
    Kafka 能够水平扩展,通过增加更多的节点来处理更多的消费者和生产者,这使得它能够应对越来越多的业务需求。

  4. 实时处理
    Kafka 提供低延迟的数据传输,这使得实时处理和分析成为可能。您可以瞬时处理到来的数据流。

  5. 支持多种消息传递模式
    Kafka 支持发布-订阅和点对点的消息传递模式,能够灵活适应不同场景下的需求。

  6. 强大的生态系统
    Kafka 拥有丰富的生态系统,包括 Kafka Streams 和 Kafka Connect,这些工具可以帮助开发者更方便地进行流处理和数据集成。

3.常见应用场景

  1. 日志聚合
    Kafka 可以作为一个集中式的日志聚合器,将分布在不同服务的日志集中到一个地方,方便后续分析和监控。

  2. 实时数据流处理
    使用 Kafka,用户可以实时处理和分析流数据,例如检测异常、生成实时报告等。

  3. 系统监控和事件追踪
    Kafka 经常用于收集和跟踪系统事件(如用户行为、系统状态等),并通过流式处理进行实时监控。

  4. 数据集成
    Kafka 可以作为数据的桥梁,连接不同的数据源和目标系统,方便实现数据的流转和转换。

  5. 消息队列
    Kafka 可用作高效的消息队列,实现服务间的异步通信。例如,在微服务架构中,服务 A 可以将消息发送到 Kafka,而服务 B 可以异步地从 Kafka 中读取处理这些消息。

二、环境部署

1.Docker安装Kafka

为了方便快捷地搭建 Kafka 环境,我们可以使用 Docker 进行安装。以下是一个 Docker Compose 的配置文件示例:

version: '3'

######## 项目依赖的环境,启动项目之前要先启动此环境 #######

services:

  #zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafka
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    restart: always
    ports:
      - 2181:2181
    networks:
      - gozero_net

  #消息队列 - Message queue
  kafka:
    image: 'bitnami/kafka:3.6.2'
    container_name: kafka
    restart: always
    ulimits:
      nofile:
        soft: 65536
        hard: 65536
    environment:
      - TZ=Asia/Shanghai
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    ports:
      - '9092:9092'
      - '9094:9094'
    volumes:
      - ./volumes/kafka:/bitnami/kafka

networks:
  gozero_net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.16.0.0/16

在这个配置文件中,我们首先定义了一个 Zookeeper 服务,因为 Kafka 依赖于 Zookeeper 来进行集群管理和协调。然后定义了 Kafka 服务,其中 Kafka 对外暴露的端口为9094

通过以下命令可以启动这个 Docker 容器:

docker compose up

2.创建和管理 Kafka Topic

进入kafka容器

我们可以通过以下命令进入 Kafka 容器:

docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash

进入kafka执行命令目录
进入容器后,需要切换到 Kafka 执行命令的目录:

cd /opt/bitnami/kafka/bin

创建topic

创建名为topic-test的topic

./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092

查看topic信息

./kafka-topics.sh --describe --topic topic-test --bootstrap-server localhost:9092

查看所有topic

./kafka-topics.sh --list --bootstrap-server localhost:9092

3.测试 Kafka Topic

为了验证 Kafka Topic 的功能是否正常,我们可以使用两个终端进行测试。

首先进入 Kafka 执行命令目录,然后分别执行以下两个命令:

生产消息

./kafka-console-producer.sh --topic topic-test --bootstrap-server localhost:9092

消费消息

./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092

此时,在生产者终端输入消息,消费者终端会自动同步接收到这些消息,从而验证了 Kafka Topic 的正常工作。

image.png

三、项目演示

1. 拉取依赖

项目中首先要拉取 go-queue 的依赖

go get github.com/zeromicro/go-queue@latest

2.配置说明

在 Go Zero 项目中,我们使用以下结构体来配置 Kafka 相关的参数:

type KqConf struct {
   service.ServiceConf

   // Brokers: Kafka 的多个 Broker 节点
   Brokers []string

   // Group: 消费者组
   Group string

   // Topic: 订阅的 Topic 主题
   Topic string

   // Offset: 如果新的 topic Kafka 没有对应的 offset 信息,或者当前的 offset 无效(历史数据被删除),
   // 需要指定从头(first)消费还是从尾(last)消费
   Offset string `json:",options=first|last,default=last"`

   // Conns: 一个 Kafka queue 对应可对应多个 consumer,Conns 对应 Kafka queue 数量,
   // 可以同时初始化多个 Kafka queue,默认只启动一个
   Conns int `json:",default=1"`

   // Consumers: go-queue 内部起多个 goroutine 从 Kafka 中获取信息写入进程内的 channel,
   // 此参数控制 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)
   Consumers int `json:",default=8"`

   // Processors: 当 Consumers 中的多个 goroutine 拉取到 Kafka 消息后,
   // 通过此参数控制当前消费逻辑的并发 goroutine 数量
   Processors int `json:",default=8"`

   // MinBytes: fetch 一次返回的最小字节数,若不够该字节数则会等待
   MinBytes int `json:",default=10240"`    // 10K

   // MaxBytes: fetch 一次返回的最大字节数,若第一条消息大小超过该限制仍会继续拉取,
   // 以确保 consumer 的正常运行。并非绝对配置,消息大小也受 broker 的 message.max.bytes 限制,
   // 以及 topic 的 max.message.bytes 限制
   MaxBytes int `json:",default=10485760"` // 10M

   // Username: Kafka 的账号(可选)
   Username string `json:",optional"`

   // Password: Kafka 的密码(可选)
   Password string `json:",optional"`
}

2.配置

配置文件

在项目的 yaml 配置文件中,我们需要添加当前的 Kafka 配置信息。为了方便,我们将生产者和消费者的配置都放在了一个配置文件中:

#....

#生产者
KqPusherConf:
  Name: log-producer
  Brokers:
    - 127.0.0.1:9094
  Group: logs-group
  Topic: topic-test
#消费者
KqConsumerConf:
    Name: log-consumer
    Brokers:
      - 127.0.0.1:9094
    Group: logs-group
    Topic: topic-test
    Offset: last
    Consumers: 8
    Processors: 8

config.go

在 internal/config 下的 config.go 中定义 go 映射的配置


type Config struct {

    /*
    .....
    */
    KqPusherConf   kq.KqConf
    KqConsumerConf kq.KqConf
}

svc注入
在 svc/serviceContext.go 中初始化 pusher 的 kq client

type ServiceContext struct {
    Config  config.Config
    KqPusherClient *kq.Pusher
    
}

func NewServiceContext(c config.Config) *ServiceContext {
    return &ServiceContext{
        Config:  c,
        KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),
    }
}

3. 生产者实现

我们以登录功能为例来演示,如何使用go - queue 的 kq client 发送消息到 Kafka。当用户登录成功后,发送用户信息:


func (l *LoginLogic) Login(req *types.LoginRequest) (resp *types.LoginResponse, err error) {
    // todo: add your logic here and delete this line
    
    /*
    ....省略其他代码
    */
    
    //生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装
    threading.GoSafe(func() {
        
        logData := map[string]any{
            "user":   user.Username,
            "mobile": user.Mobile,
        }
        logs, _ := json.Marshal(logData)
        // 使用Push推送消息,消息为json
        err := l.svcCtx.KqPusherClient.Push(l.ctx, string(logs))
        if err != nil {
            logx.Errorf("KqPusherClient Push Error , err :%v", err)
        }
    })

    // 如果既没有验证码也没有密码
    return nil, errors.New(10010, "未提供有效的登录凭证") 
}

生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装,如果出现panics 会自动恢复。

4. 消费者实现

internal 下新建一个 mq 文件夹,然后在 mq 文件夹下新建一个消费者 consumer.go:

package mqs

import (
    "beyond/user/api/internal/svc"
    "context"
    "fmt"
    "github.com/zeromicro/go-zero/core/logc"
    "github.com/zeromicro/go-zero/core/logx"
)

//定义日志消费者
type LogsConsumer struct {
    ctx    context.Context
    svcCtx *svc.ServiceContext
}

// 定义构造方法
func NewLogsConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LogsConsumer {
    return &LogsConsumer{
        ctx:    ctx,
        svcCtx: svcCtx,
    }
}

// Consume为go zero内置接口, 实现Consume接口方法
func (l *LogsConsumer) Consume(ctx context.Context, key, val string) error {
    
    //logx.Infof("Consumer key :%s , val :%s", key, val)
    logc.Infof(ctx, "Consumer key :%s, val :%s", key, val)
    return nil
}

在这个文件中,我们定义了一个 LogsConsumer 结构体来表示日志消费者,并实现了Consume 方法,该方法是 go - queue 内置的接口方法,用于处理接收到的消息。

image.png

因为在实际应用中可能存在多个消费者,所以我们在 mq 文件夹下新建一个 mqs.go 文件来监听多个消费者,代码如下:

package mqs

import (
    "beyond/user/api/internal/config"
    "beyond/user/api/internal/svc"
    "context"

    "github.com/zeromicro/go-queue/kq"
    "github.com/zeromicro/go-zero/core/service"
)

func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {
    // 监听消费者状态变化
    return []service.Service{
        //创建消息队列
        kq.MustNewQueue(c.KqConsumerConf, NewLogsConsumer(ctx, svcCtx)),
    }

}

在 main.go 中启动 consumers 等待消费


func main() {
    flag.Parse()

    var c config.Config
    conf.MustLoad(*configFile, &c)

    server := rest.MustNewServer(c.RestConf)
    defer server.Stop()

    svcCtx := svc.NewServiceContext(c)
    handler.RegisterHandlers(server, svcCtx)

    fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
    // 因为现在添加了mq,属于多服务状态,所以需要启动mq服务
    //server.Start()  
    //创建新的服务组
    serviceGroup := service.NewServiceGroup()
    defer serviceGroup.Stop()
    // 从mq中获取消费者服务,并添加到服务组中
    for _, mq := range mqs.Consumers(c, context.Background(), svcCtx) {
        serviceGroup.Add(mq)
    }
    //添加原来的server服务
    serviceGroup.Add(server)
    // 启动服务组
    serviceGroup.Start()

}

5.启动项目

我们可以启动项目,在当前的演示中,消费者只是简单地输出接收到的日志信息。

image.png

但在实际应用中,你可以根据业务需求进行拓展。例如,当接收到用户注册成功的消息时,可以拓展代码实现发送邮件或者短信给用户,提示用户注册成功;或者在接收到订单相关的消息时,进行订单状态的更新、库存的扣减等操作。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容