kafka基础知识

kafka.png
kafka.png
image.png
image.png

为什么要用kafka:

1.缓冲和削峰
2.解耦和扩展性
3.异步通信
4.健壮性
5.冗余

什么是ISR和AR

ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本

下面的所有终端命令基于都是windows10环境下
1.启动zookeeper:在kafka目录下执行:
首先需要修改config目录下的zookeeper.properties的配置,指定数据存放的目录
dataDir=D:/mysoftware/ELK/zookeeperdata

启动命令
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2.启动kafka,同理先修改配置文件,config下的server.properties
log.dirs=D:/mysoftware/ELK/kafkalog // 日志目录
zookeeper.connect=localhost:2181 //zookeeper地址

启动命令
bin\windows\kafka-server-start.bat config\server.properties

zookeeper:kafka节点启动的时候注册节点,查询节点(轮询,hash,权重)

操作kafka:github.com/Shopify/sarama 客户端包

生产者

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/Shopify/sarama"
)

var Address = []string{"127.0.0.1:9092"}

func main() {
    fmt.Println("hello")
    syncProducer(Address)
}

//同步消息模式
func syncProducer(address []string) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(address, config)
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }
    defer p.Close()
    topic := "test"
    srcValue := "sync: this is a message. index=%d"
    for i := 0; i < 100; i++ {
        value := fmt.Sprintf(srcValue, i)
        msg := &sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.ByteEncoder(value),
        }
        part, offset, err := p.SendMessage(msg)
        if err != nil {
            log.Printf("send message(%s) err=%s \n", value, err)
        } else {
            fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", part, offset)
        }
        time.Sleep(2 * time.Second)
    }
}

func SaramaProducer() {

    config := sarama.NewConfig()
    //等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    //随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
    config.Version = sarama.V0_10_0_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, e := sarama.NewAsyncProducer(Address, config)
    if e != nil {
        fmt.Println(e)
        return
    }
    defer producer.AsyncClose()

    //循环判断哪个通道发送过来数据.
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for {
            select {
            case <-p.Successes():
                //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(producer)

    var value string
    for i := 0; ; i++ {
        time.Sleep(500 * time.Millisecond)
        time11 := time.Now()
        value = "this is a message 0606 " + time11.Format("15:04:05")

        // 发送的消息,主题。
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
        msg := &sarama.ProducerMessage{
            Topic: "0606_test",
        }

        //将字符串转化为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)

        //使用通道发送
        producer.Input() <- msg
    }
}

消费者

package main

import (
    "fmt"
    // "log"
    // "os"
    // "os/signal"
    // "sync"

    //cluster "github.com/bsm/sarama-cluster"

    "github.com/Shopify/sarama"
)

func main() {
    consumer_test()
}


func consumer_test() {
    fmt.Printf("consumer_test")

    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Version = sarama.V0_11_0_2

    // consumer
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Printf("consumer_test create consumer error %s\n", err.Error())
        return
    }

    defer consumer.Close()

    partition_consumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
    if err != nil {
        fmt.Printf("try create partition_consumer error %s\n", err.Error())
        return
    }
    defer partition_consumer.Close()

    for {
        select {
        case msg := <-partition_consumer.Messages():
            fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
                msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
        case err := <-partition_consumer.Errors():
            fmt.Printf("err :%s\n", err.Error())
        }
    }
}

终端启动消费者:进入kafka按照目录
bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=test --from-beginning

重复消费和消息丢失的问题:
1.offset没有及时更新,导致消息重复发送消费
方法一:引入第三方中间件, 将偏移量存入redis, 每次消费前判断redis的偏移量(双重保证,看业务接收程度 )

2.offset有及时更新,但是业务处理失败,导致消息消费不成功,造成消息丢失
方法一:把数据处理和提交offset放在一个事务里
方法二:把数据处理操作编程幂等的(消费一个和多次效果是一样的)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容