golang kafka小试消息队列

Kafka 安装配置、更多资料请参考其官网。

启动 kafka server

在这之前需要启动 zookeeper 做服务治理(单机)。

$ bin/zkServer.sh status conf/zoo_sample.cfg

如提示权限限制加上 sudo

启动 kafka server

$ bin/kafka-server-start.sh config/server.properties

启动消息队列(本部分仅为测试 server)

新建 Topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

$ bin/kafka-topics.sh --list --zookeeper localhost:2181 (test)

1. 启动 Producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2. 启动 Consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

此时在 Producer 端发送消息,在 Consumer 就会显示,如下图所示。

(上图中 Consumer 多出了好几个消息是我截图之前测试发出的)


Action

本文使用 sarama 库作为 kafka 的 go API。sarama 库没有给出很具体的文档,可以参考其源码。

Producer

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    addr := []string{"localhost:9092"}

    producer, err := sarama.NewSyncProducer(addr, config)
    if err != nil {
        panic(err)
    }

    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic:     "hello",
        Partition: int32(-1),
        Key:       sarama.StringEncoder("key"),
    }

    var value string
    for {
        _, err := fmt.Scanf("%s", &value)
        if err != nil {
            break
        }
        msg.Value = sarama.ByteEncoder(value)
        fmt.Println(value)

        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            fmt.Println("Send message Fail")
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
    }
}

Consumer

package main

import (
    "fmt"
    "sync"
    "github.com/Shopify/sarama"
)

var (
    wg  sync.WaitGroup
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }

    partitionList, err := consumer.Partitions("hello")
    if err != nil {
        panic(err)
    }

    for partition := range partitionList {
        pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        defer pc.AsyncClose()

        wg.Add(1)

        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }

        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

结果如下:

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,915评论 18 139
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,884评论 8 167
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,743评论 13 425
  • 我倾向于用PC写文章,因为这样能专注于屏幕,而不是键盘,手机就不同了,要看键盘。好吧,我承认这是我手机没有练习过两...
    郭青耀阅读 102评论 0 0
  • 2018-03-18 【指绘】 [背景建筑线稿是素材] 打了耳洞后用新软件画的第一幅图 (软件名:Medibang...
    木子明婳阅读 246评论 0 1