Go使用Kafka生产数据

Kafka官网介绍: http://kafka.apache.org/intro

Kafka 是一种高吞吐量的分布式发布订阅消息系统

一些术语:

"Broker" Kafka集群包含一个或多个服务器,这种服务器被称为broker 

"Topic" 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

"Partition" Partition是物理上的概念,每个Topic包含一个或多个Partition.

"Producer" 负责发布消息到Kafka broker

"Consumer" 消息消费者,向Kafka broker读取消息的客户端。

"Consumer Group" 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)


我司用的阿里云的kafka服务,研发人员不太用关注如何搭建和部署,

以下重点说一下本地开发环境用mac docker的搭建和调试开发和使用

mac安装docker和docker-compose略 

单机版 kafka 安装使用 wurstmeister/kafka-docker 的镜像

准备步骤

创建日志映射和脚本文件存储目录

# 日志存放目录mkdir-p~/data/docker/kafka/logs# .yml文件存放目录mkdir-p~/data/docker/compose/kafka# 文件读写权限chmod777~/data

在~/data/docker/compose/kafka 文件夹中创建 docker-compose-single-wurstmeister-kafka.yml 文件

编写脚本

由于 kafka 依赖 Zookeeper, 所以在脚本里也会把 Zookeeper 的镜像包一起拉下来

version:'2'services:  zookeeper:    image: wurstmeister/zookeeper    container_name:"zk-kafka"ports:-"2181:2181"kafka:    image: wurstmeister/kafka    container_name:"kafka-single"ports:-"9092:9092"environment:# client 要访问的 broker 地址KAFKA_ADVERTISED_HOST_NAME:127.0.0.1# 通过端口连接 zookeeperKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181# 每个容器就是一个 broker,设置其对应的 IDKAFKA_BROKER_ID:0# 外部网络只能获取到容器名称,在内外网络隔离情况下# 通过名称是无法成功访问 kafka 的# 因此需要通过绑定这个监听器能够让外部获取到的是 IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092# kafka 监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092# Kafka默认使用-Xmx1G -Xms1G的JVM内存配置,由于服务器小,调整下启动配置# 这个看自己的现状做调整,如果资源充足,可以不用配置这个KAFKA_HEAP_OPTS:"-Xmx256M -Xms128M"# 设置 kafka 日志位置KAFKA_LOG_DIRS:"/kafka/logs"volumes:-/var/run/docker.sock:/var/run/docker.sock# 挂载 kafka 日志# :前面的路径是你电脑上路径 后面是kafka容器日志路径-~/data/docker/kafka/logs:/kafka/logs

执行脚本

在终端执行以下命令创建容器并后台运行

docker-compose -f ~/data/docker/compose/kafka/docker-compose-single-wurstmeister-kafka.yml up -d

出现下方提示信息时, 即为创建容器成功

Creating kafka-single ...doneCreating zk-kafka    ...done

测试启动成功

使用 docker ps 查看容器是否运行

可以看到 kafka-single zk-kafka 两个容器都已经处于 running 状态了, 安装成功了; 接下来创建 topic 测试消息生产、消费 

创建 topic

进入 kakfa 容器内部

docker exec -it kafka-single bash

执行命令创建 topic

$KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-test --zookeeper zk-kafka:2181 --replication-factor 1 --partitions 1

出现如下提示即为创建成功

发送消息

topic 创建成功后, 直接执行命令即可

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test

输入"这是一个测试的消息~", 然后回车, 接下来去消费方消费消息

消费消息

新开一个窗口, 同样进入 kakfa 容器内部

docker exec -it kafka-single bash

执行命令消费 topic

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-single:9092 --from-beginning --topic topic-test

终端中出现 "这是一个测试的消息~" 即为 成功消费

生产者:

消费者:

另外开发过程中为了看生产的数据到了那个partition 

建议安装个Mac Kafka可视化工具(kafkatool)

官网: https://www.kafkatool.com/download.html

直接点击下载安装 

之后

点击右下方test 按钮 测试连接成功 

yes 然后就可以开始使用了 

下面说一下用"github.com/segmentio/kafka-go"

官网上的demo:

// make a writer that produces to topic-A, using the least-bytes distributionw :=&kafka.Writer{    Addr:    kafka.TCP("localhost:9092"),    Topic:  "topic-A",    Balancer:&kafka.LeastBytes{},}err := w.WriteMessages(context.Background(),    kafka.Message{        Key:  []byte("Key-A"),        Value: []byte("Hello World!"),    },    kafka.Message{        Key:  []byte("Key-B"),        Value: []byte("One!"),    },    kafka.Message{        Key:  []byte("Key-C"),        Value: []byte("Two!"),    },)if err != nil {    log.Fatal("failed to write messages:", err)}if err := w.Close(); err != nil {    log.Fatal("failed to close writer:", err)}


实际项目中的应用:

package mainimport "github.com/segmentio/kafka-go"var Writer *kafka.Writerfunc init() {  initKafka()}func initKafka() {    brokers1 := iniconfig.GetString("market_click_kafka", "brokers1")    brokers2 := iniconfig.GetString("market_click_kafka", "brokers2")    brokers3 := iniconfig.GetString("market_click_kafka", "brokers3")    topic := iniconfig.GetString("market_click_kafka", "topic")    Writer =&kafka.Writer{        Addr:    kafka.TCP(brokers1, brokers2, brokers3),        Topic:    topic,        Balancer:&kafka.RoundRobin{},        Async: true, //异步写入    }}调用 click_kafka.gopackage click_kafkaimport (    "context"    "encoding/json"    "github.com/segmentio/kafka-go"    "go.uber.org/zap"    "mi.market/common/logger"    "mi.market/entitys"    "mi.market/services/tools/alarm"    "mi.market/services/tools/redis/market_redis")func WriteClickMessages(Writer *kafka.Writer, click entitys.Click) {    market_redis.IncrByForeverKey("market:clicks:kafka", 1)    jsonBytes, err := json.Marshal(click)    if err != nil {        alarm.LoggerAlarm("WriteClickMessagesjsonBytesFail", "exception", click.DeviceId, err.(error).Error())    }    logger.Info("WriteClickMessages", zap.Any("click", click))    err = Writer.WriteMessages(        context.Background(),        kafka.Message{            Value: jsonBytes,        },    )    if err != nil {        alarm.LoggerAlarm("WriteClickMessagesFail", "exception", click.DeviceId, err.(error).Error())    }}注意:长连接 指针

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容