Kafka官网介绍: http://kafka.apache.org/intro
Kafka 是一种高吞吐量的分布式发布订阅消息系统
一些术语:
"Broker" Kafka集群包含一个或多个服务器,这种服务器被称为broker
"Topic" 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
"Partition" Partition是物理上的概念,每个Topic包含一个或多个Partition.
"Producer" 负责发布消息到Kafka broker
"Consumer" 消息消费者,向Kafka broker读取消息的客户端。
"Consumer Group" 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
我司用的阿里云的kafka服务,研发人员不太用关注如何搭建和部署,
以下重点说一下本地开发环境用mac docker的搭建和调试开发和使用
mac安装docker和docker-compose略
单机版 kafka 安装使用 wurstmeister/kafka-docker 的镜像
准备步骤
创建日志映射和脚本文件存储目录
# 日志存放目录mkdir-p~/data/docker/kafka/logs# .yml文件存放目录mkdir-p~/data/docker/compose/kafka# 文件读写权限chmod777~/data
在~/data/docker/compose/kafka 文件夹中创建 docker-compose-single-wurstmeister-kafka.yml 文件
编写脚本
由于 kafka 依赖 Zookeeper, 所以在脚本里也会把 Zookeeper 的镜像包一起拉下来
version:'2'services: zookeeper: image: wurstmeister/zookeeper container_name:"zk-kafka"ports:-"2181:2181"kafka: image: wurstmeister/kafka container_name:"kafka-single"ports:-"9092:9092"environment:# client 要访问的 broker 地址KAFKA_ADVERTISED_HOST_NAME:127.0.0.1# 通过端口连接 zookeeperKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181# 每个容器就是一个 broker,设置其对应的 IDKAFKA_BROKER_ID:0# 外部网络只能获取到容器名称,在内外网络隔离情况下# 通过名称是无法成功访问 kafka 的# 因此需要通过绑定这个监听器能够让外部获取到的是 IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092# kafka 监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092# Kafka默认使用-Xmx1G -Xms1G的JVM内存配置,由于服务器小,调整下启动配置# 这个看自己的现状做调整,如果资源充足,可以不用配置这个KAFKA_HEAP_OPTS:"-Xmx256M -Xms128M"# 设置 kafka 日志位置KAFKA_LOG_DIRS:"/kafka/logs"volumes:-/var/run/docker.sock:/var/run/docker.sock# 挂载 kafka 日志# :前面的路径是你电脑上路径 后面是kafka容器日志路径-~/data/docker/kafka/logs:/kafka/logs
执行脚本
在终端执行以下命令创建容器并后台运行
docker-compose -f ~/data/docker/compose/kafka/docker-compose-single-wurstmeister-kafka.yml up -d
出现下方提示信息时, 即为创建容器成功
Creating kafka-single ...doneCreating zk-kafka ...done
测试启动成功
使用 docker ps 查看容器是否运行
可以看到 kafka-single zk-kafka 两个容器都已经处于 running 状态了, 安装成功了; 接下来创建 topic 测试消息生产、消费
创建 topic
进入 kakfa 容器内部
docker exec -it kafka-single bash
执行命令创建 topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-test --zookeeper zk-kafka:2181 --replication-factor 1 --partitions 1
出现如下提示即为创建成功
发送消息
topic 创建成功后, 直接执行命令即可
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test
输入"这是一个测试的消息~", 然后回车, 接下来去消费方消费消息
消费消息
新开一个窗口, 同样进入 kakfa 容器内部
docker exec -it kafka-single bash
执行命令消费 topic
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-single:9092 --from-beginning --topic topic-test
终端中出现 "这是一个测试的消息~" 即为 成功消费
生产者:
消费者:
另外开发过程中为了看生产的数据到了那个partition
建议安装个Mac Kafka可视化工具(kafkatool)
官网: https://www.kafkatool.com/download.html
直接点击下载安装
之后
点击右下方test 按钮 测试连接成功
yes 然后就可以开始使用了
下面说一下用"github.com/segmentio/kafka-go"
官网上的demo:
// make a writer that produces to topic-A, using the least-bytes distributionw :=&kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "topic-A", Balancer:&kafka.LeastBytes{},}err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("Two!"), },)if err != nil { log.Fatal("failed to write messages:", err)}if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err)}
实际项目中的应用:
package mainimport "github.com/segmentio/kafka-go"var Writer *kafka.Writerfunc init() { initKafka()}func initKafka() { brokers1 := iniconfig.GetString("market_click_kafka", "brokers1") brokers2 := iniconfig.GetString("market_click_kafka", "brokers2") brokers3 := iniconfig.GetString("market_click_kafka", "brokers3") topic := iniconfig.GetString("market_click_kafka", "topic") Writer =&kafka.Writer{ Addr: kafka.TCP(brokers1, brokers2, brokers3), Topic: topic, Balancer:&kafka.RoundRobin{}, Async: true, //异步写入 }}调用 click_kafka.gopackage click_kafkaimport ( "context" "encoding/json" "github.com/segmentio/kafka-go" "go.uber.org/zap" "mi.market/common/logger" "mi.market/entitys" "mi.market/services/tools/alarm" "mi.market/services/tools/redis/market_redis")func WriteClickMessages(Writer *kafka.Writer, click entitys.Click) { market_redis.IncrByForeverKey("market:clicks:kafka", 1) jsonBytes, err := json.Marshal(click) if err != nil { alarm.LoggerAlarm("WriteClickMessagesjsonBytesFail", "exception", click.DeviceId, err.(error).Error()) } logger.Info("WriteClickMessages", zap.Any("click", click)) err = Writer.WriteMessages( context.Background(), kafka.Message{ Value: jsonBytes, }, ) if err != nil { alarm.LoggerAlarm("WriteClickMessagesFail", "exception", click.DeviceId, err.(error).Error()) }}注意:长连接 指针