Kafka 安装配置、更多资料请参考其官网。
启动 kafka server
在这之前需要启动 zookeeper 做服务治理(单机)。
$ bin/zkServer.sh status conf/zoo_sample.cfg
如提示权限限制加上 sudo
。
启动 kafka server
$ bin/kafka-server-start.sh config/server.properties
启动消息队列(本部分仅为测试 server)
新建 Topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181 (test)
1. 启动 Producer
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
2. 启动 Consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
此时在 Producer 端发送消息,在 Consumer 就会显示,如下图所示。
(上图中 Consumer 多出了好几个消息是我截图之前测试发出的)
Action
本文使用 sarama 库作为 kafka 的 go API。sarama 库没有给出很具体的文档,可以参考其源码。
Producer
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
addr := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(addr, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "hello",
Partition: int32(-1),
Key: sarama.StringEncoder("key"),
}
var value string
for {
_, err := fmt.Scanf("%s", &value)
if err != nil {
break
}
msg.Value = sarama.ByteEncoder(value)
fmt.Println(value)
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
}
}
Consumer
package main
import (
"fmt"
"sync"
"github.com/Shopify/sarama"
)
var (
wg sync.WaitGroup
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
partitionList, err := consumer.Partitions("hello")
if err != nil {
panic(err)
}
for partition := range partitionList {
pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
wg.Wait()
consumer.Close()
}
结果如下: