为什么要用kafka:
1.缓冲和削峰
2.解耦和扩展性
3.异步通信
4.健壮性
5.冗余
什么是ISR和AR
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本
下面的所有终端命令基于都是windows10环境下
1.启动zookeeper:在kafka目录下执行:
首先需要修改config目录下的zookeeper.properties的配置,指定数据存放的目录
dataDir=D:/mysoftware/ELK/zookeeperdata
启动命令
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
2.启动kafka,同理先修改配置文件,config下的server.properties
log.dirs=D:/mysoftware/ELK/kafkalog // 日志目录
zookeeper.connect=localhost:2181 //zookeeper地址
启动命令
bin\windows\kafka-server-start.bat config\server.properties
zookeeper:kafka节点启动的时候注册节点,查询节点(轮询,hash,权重)
操作kafka:github.com/Shopify/sarama 客户端包
生产者
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/Shopify/sarama"
)
var Address = []string{"127.0.0.1:9092"}
func main() {
fmt.Println("hello")
syncProducer(Address)
}
//同步消息模式
func syncProducer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
defer p.Close()
topic := "test"
srcValue := "sync: this is a message. index=%d"
for i := 0; i < 100; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Printf("send message(%s) err=%s \n", value, err)
} else {
fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", part, offset)
}
time.Sleep(2 * time.Second)
}
}
func SaramaProducer() {
config := sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
config.Version = sarama.V0_10_0_1
fmt.Println("start make producer")
//使用配置,新建一个异步生产者
producer, e := sarama.NewAsyncProducer(Address, config)
if e != nil {
fmt.Println(e)
return
}
defer producer.AsyncClose()
//循环判断哪个通道发送过来数据.
fmt.Println("start goroutine")
go func(p sarama.AsyncProducer) {
for {
select {
case <-p.Successes():
//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-p.Errors():
fmt.Println("err: ", fail.Err)
}
}
}(producer)
var value string
for i := 0; ; i++ {
time.Sleep(500 * time.Millisecond)
time11 := time.Now()
value = "this is a message 0606 " + time11.Format("15:04:05")
// 发送的消息,主题。
// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
msg := &sarama.ProducerMessage{
Topic: "0606_test",
}
//将字符串转化为字节数组
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//使用通道发送
producer.Input() <- msg
}
}
消费者
package main
import (
"fmt"
// "log"
// "os"
// "os/signal"
// "sync"
//cluster "github.com/bsm/sarama-cluster"
"github.com/Shopify/sarama"
)
func main() {
consumer_test()
}
func consumer_test() {
fmt.Printf("consumer_test")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
// consumer
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Printf("consumer_test create consumer error %s\n", err.Error())
return
}
defer consumer.Close()
partition_consumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
if err != nil {
fmt.Printf("try create partition_consumer error %s\n", err.Error())
return
}
defer partition_consumer.Close()
for {
select {
case msg := <-partition_consumer.Messages():
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
case err := <-partition_consumer.Errors():
fmt.Printf("err :%s\n", err.Error())
}
}
}
终端启动消费者:进入kafka按照目录
bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=test --from-beginning
重复消费和消息丢失的问题:
1.offset没有及时更新,导致消息重复发送消费
方法一:引入第三方中间件, 将偏移量存入redis, 每次消费前判断redis的偏移量(双重保证,看业务接收程度 )
2.offset有及时更新,但是业务处理失败,导致消息消费不成功,造成消息丢失
方法一:把数据处理和提交offset放在一个事务里
方法二:把数据处理操作编程幂等的(消费一个和多次效果是一样的)