kafka 上手指南:单节点

大家好,我叫谢伟,是一名程序员。

今天的主题:kafka 使用指南,单节点版本。

1. 使用场景

如果你是一名后端工程师,设计的应用正常的线上运行,某次秒杀活动,突然间把系统搞崩了,排查系统发现很多的流量没有处理,导致系统挂了,这个时候有两种思路: 1. nginx 反向代理,把更多的请求转发给内部网络的服务器上进行处理,达到一个负载均衡的目的 2. 使用消息系统,将更多的请求使用中间件“缓存”起来,再从这个系统中不断的取到缓存的请求,进行进一步的处理。

后者使用到的消息系统,就是kafka 的一个使用场景。

那么什么是 kafka?

kafka 是一个分布式消息系统,目前已定位为分布式流式处理平台。

简单的说一个系统A 将消息发给消息系统,一个系统B 再从消息系统中取到消息,进行后续的处理。

常见的用来描述 kafka 应用场景的一个词是:削峰填谷,削减波峰流量,填充波谷流量,使系统尽量的平滑。

由此得处:kafka 的三个典型应用场景

  • 消息系统
  • 存储系统
  • 分布式流式处理平台

消息系统是目前最广泛的应用;消息传输需要存储起来,供后续系统拉取,故也可以当作存储系统;拉取消息之后,其实也是供后续系统处理,那么为什么不把数据处理也包含再kafka 系统中?分布式流式处理平台,大概就是这个意思。

下文陈述最核心的应用:消息系统

2. 基本概念

一条消息由系统A 产生,发往消息系统,系统B 从消息系统中拉取,这其中涉及到很多的概念。

  • 系统A 称为生产者 producer,目的是发送消息
  • 消息系统称为 broker,本质是服务进程目的是接受生产者的消息、消费者的消息拉取请求、持久化
  • 系统B 称为消费者 consumer, 目的是拉取消息系统中的消息

针对生产者、消费者有不同的设置参数,决定了生产者、消费者的不同行为。

生产者要发送消息,首先要知道发往何处,即要知道 broker 的地址,知道 broker 的地址,broker(kafka server) 的设置约束了持久化存储的地址及其他行为,除此之外,如何区分发的消息的类型不同呢?kafka 系统给这个区分消息的概念取了个逻辑概念:Topic , 即生产者指定的 Topic 不同,存储的地址就不同。

针对 Topic,简单的场景是,不断的往里面发内容,持久化存储就不断以追加的模式存储,简单场景没什么问题,问题是消息数据过多的话,不利于系统消费,很简单的想法,分不同的“文件”追加存储,把整体规模缩小,这个概念在 kafka 中称之为分区:partition. 消息可以不断的以追加的模式不断的发往分区内,分区有编号,起始位 0 ,消息追加模式存储在分区内,会给一个编号 offset

消费者从 broker 系统中拉取消息,首先要知道broker 地址,其次需要知道 Topic,更细化的还可以设置哪个分区,哪个偏移量 offset 开始,消费消息。

那消息万一丢了咋整?一个简单的做法就是冗余备份:Replication,多份备份,其中有一个是 Leader , 其他的是 follower, leader 的作用是和消息对接,follower 不直接和消息对接,只负责和 leader 对接,不断的同步数据。

多个 broker 构成 kafka 集群,万一一个挂了 kafka 系统依靠 zookeeper 进行重新选举产生新leader。

kafka cluster:


image

kafka topic: 分区概念

image

kafka 集群:

image

3. 客户端使用

基于上述概念:那么如何构建一个Kafka 服务,完成消息系统呢?

  • 启动服务进程:broker

伪代码:

type Broker struct{
    Addr 
    Config
    ...
}
  • 生产者连接 broker

伪代码:


type Producer struct{
    Config
    Message 
    ...
}

  • 消费者连接 broker

伪代码

type Consumer strcut{
    Config
    Topic 
    Partitions
    Offset
    ...
}

基本的思路:

  • 启动kafka服务
  • 系统A 连接服务,发送消息
  • 系统B 连接服务,消费消息

结合官网的示例:如何完成最基本的消息收发。

下载安装包:kafka_2.12-2.3.0.tgz

  • 2.12 指编译器版本
  • 2.3.0 指kafka 版本

解压之后,最重要的有两目录:

  • bin : 一系列的脚本,比如启动 zookeeper 服务,创建 topic,生产者生产消息,消费者消费消息等
zookeeper-server-start.sh
zookeeper-server-stop.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-topics.sh
kafka-server-start.sh
kafka-server-stop.sh
...

  • config: 配置文件:比如配置 zookeeper 端口,配置kafka 日志存储目录、对外端口,消息最大容量,保存时常等
zookeeper.properties
server.properties
producer.properties
consumer.properties
...

大概200多个参数吧,不好意思,我记不住。那怎么办?不学了吗,那挣不了钱,涨不了工资啊。

基本默认设置,部分按分类设置:

  • zookeeper.properties

kafka 依赖于 zookeeper 分布式协调

dataDir=/tmp/zookeeper
clientPort=2181

记住这个默认的 clientPort=2181

  • server.properties

kafka server 服务

log.dirs=/tmp/kafka-logs //日志存储目录
log.retention.hours=168 // 日志存储时长
broker.id=0 // 默认 broker id,集群方式的 kafka 设置,给每个 broker 编号
listeners=PLAINTEXT://:9092 // 对外提供的服务入口地址
zookeeper.connect=localhost:2181 // ZooKeeper集群地址
...
  • producer.properties

约定消息等的内容

  • consumer.properties

约定消费消息等的内容

配置好配置参数后:

  • 启动 zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动 kafka 服务进程
> bin/kafka-server-start.sh config/server.properties

创建topic, 查询 topic 等可以使用:kafka-topics.sh

生产者生产消息可以使用:kafka-console-producer.sh

消费者消费消息可以使用:kafka-console-consumer.sh

当然,这些操作,一般只供测试使用,实际的使用是使用对应变成语言的客户端。

4. 演示

kafka go版本客户端:

下载安装:

go get -u -v github.com/Shopify/sarama

4.1 生产者

系统 A

  • 生产者
type KafkaAction struct {
    DataSyncProducer  sarama.SyncProducer
    DataAsyncProducer sarama.AsyncProducer
}
// 同步方式

func newDataSyncProducer(brokerList []string) sarama.SyncProducer {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
    config.Producer.Retry.Max = 5                    // Retry up to 10 times to produce the message
    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
    producer, err := sarama.NewSyncProducer(brokerList, config)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer1:", err)
    }
    return producer

}

// 异步方式
func newDataAsyncProducer(brokerList []string) sarama.AsyncProducer {
    config := sarama.NewConfig()
    sarama.Logger = log.New(os.Stdout, "[KAFKA] ", log.LstdFlags)
    config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
    config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
    config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
    config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
    producer, err := sarama.NewAsyncProducer(brokerList, config)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer2:", err)
    }
    go func() {
        for err := range producer.Errors() {
            log.Println("Failed to write access log entry:", err)
        }
    }()
    return producer
}

还记得生产者有一系列配置参数吗?config 就这这个作用,有默认值,可以自己设置对应的值。

比如:压缩算法

config.Producer.Compression = sarama.CompressionSnappy

常用的压缩算法有:

  • gzip
  • snappy
  • lz4
  • zstd

不同的压缩算法主要在压缩比和吞吐量不同。

比如分区规则

config.Producer.Partitioner = sarama.NewRoundRobinPartitioner

常用的分区规则:

  • 轮询机制
  • 随机分区
  • 按 key 分区

比如:发送消息是否返回成功与否

onfig.Producer.RequiredAcks = sarama.WaitForLocal
  • 消息:生产者只传递字节组数据。

接口

type Encoder interface {
    Encode() ([]byte, error)
    Length() int
}

发送的消息需要实现Encoder 接口,即定义的消息结构体需要实现 Encode 和 Length 方法。

type SendMessage struct {
    Method  string `json:"method"`
    URL     string `json:"url"`
    Value   string `json:"value"`
    Date    string `json:"date"`
    encoded []byte
    err     error
}

func (S *SendMessage) Length() int {
    b, e := json.Marshal(S)
    S.encoded = b
    S.err = e
    return len(string(b))
}
func (S *SendMessage) Encode() ([]byte, error) {
    return S.encoded, S.err
}
  • 发送消息
func (K *KafkaAction) Do(v interface{}) {
    message := v.(SendMessage)
    // 发送的消息返回分区和偏移量
    partition, offset, err := K.DataSyncProducer.SendMessage(&sarama.ProducerMessage{
        Topic: TOPIC,
        Value: &message,
    })
    if err != nil {
        log.Println(err)
        return
    }
    value := map[string]string{
        "method": message.Method,
        "url":    message.URL,
        "value":  message.Value,
        "date":   message.Date,
    }
    fmt.Println(fmt.Sprintf("/%d/%d/%+v", partition, offset, value))
}

比如我们按照上面的配置发送消息:topic: topic-golang
partition/offset/value

/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/2/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/3/map[date:12344 method:get5 url:www.baidu.com4 value:da4]

上文只有一个 partition , offset值不断增加。

创建另外一个 topic, 分10个区。topic: topic-python

在日志中显示成咋样的呢?

// cd log.dirs  ; server.properties 中的设置

topic-golang-0
topic-python-0
topic-python-1
topic-python-2
topic-python-3
topic-python-4
topic-python-5
topic-python-6
topic-python-7
topic-python-8
topic-python-9

往 topic-python 中发送日志,分区规则轮询:

/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]

轮询,不断的往分区内存消息。

4.2 消费者

系统 B

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    brokers := []string{"127.0.0.1:9092"}
    master, err := sarama.NewConsumer(brokers, config)
    if err != nil {
        panic(err)
    }
    defer func() {
        if err := master.Close(); err != nil {
            panic(err)
        }
    }()
    _, e := master.Partitions("topic-python")
    if e != nil {
        log.Println(e)
    }
    consumer, err := master.ConsumePartition("topic-python", 0, sarama.OffsetOldest)
    if err != nil {
        panic(err)
    }
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    doneCh := make(chan struct{})
    go func() {
        for {
            select {
            case err := <-consumer.Errors():
                fmt.Println(err)
            case msg := <-consumer.Messages():
                fmt.Println("Received messages", string(msg.Key), string(msg.Value), msg.Topic)
            case <-signals:
                fmt.Println("Interrupt is detected")
                doneCh <- struct{}{}
            }
        }
    }()
    <-doneCh
}
  • 消费者指定了 topic: topic-python
  • 消费者指定了 partition: 0

还记得生产者向 topic-python 内发送的消息吗?
partition/offset/value

/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]

可以看出:partition: 0 中有两条消息。那么消费者指定了分区,只能消费这两条消息。

Received messages  {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python
Received messages  {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python

4.3 其他

使用 kafka 客户端 ,那么我们还需要哪些功能?

  • 关于 Topic 的创建、描述、删除等
  • 消费者组描述等
  • 元信息:metadata
type ClusterAdmin interface {
    CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
    ListTopics() (map[string]TopicDetail, error)
    DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
    DeleteTopic(topic string) error
    CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
    DeleteRecords(topic string, partitionOffsets map[int32]int64) error
    DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
    AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
    CreateACL(resource Resource, acl Acl) error
    ListAcls(filter AclFilter) ([]ResourceAcls, error)
    DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
    ListConsumerGroups() (map[string]string, error)
    DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
    ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
    DeleteConsumerGroup(group string) error
    DescribeCluster() (brokers []*Broker, controllerID int32, err error)
    Close() error
}

关于单节点 kafka 的基本应用就这些。

5. 容器服务

任何提供服务的系统,都可以使用容器版本,kafka 也可以使用容器版本。配置可以使用环境变量的形式设置。

docker-compose.yml

version: '2'
services:
  ui:
    image: index.docker.io/sheepkiller/kafka-manager:latest
    depends_on:
      - zookeeper
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: zookeeper:2181
  zookeeper:
    image: index.docker.io/wurstmeister/zookeeper:latest
    ports:
      - 2181:2181
  server:
    image: index.docker.io/wurstmeister/kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_OFFSETS_TOPIC_REPLIATION_FACTOR: 1
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  • zookeeper 分布式协调系统
  • kafka server Kafka 服务
  • kafka-manager kafka 管理平台

后续集群版本。

<完>

代码:https://github.com/wuxiaoxiaoshen/go-thirdparty

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,584评论 0 10
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,345评论 0 9
  • 什么是消息系统? 早期两个应用程序间进行消息传递需要保证两个应用程序同时在线,并且耦合度很高。为了解决应用程序不在...
    Java小铺阅读 1,206评论 0 2
  • 1介绍 Kafka是一个分布式的、可分区的、可复制的消息系统,提供了一个生产者、缓冲区、消费者的模型。 Kafka...
    虫儿飞ZLEI阅读 629评论 0 1
  • 古之善为士者,微妙玄通,深不可识。夫唯不可识,故强为之容。豫焉若冬涉川,犹兮若畏四邻,俨兮其若容,涘兮若冰之将释,...
    沉醉的文人阅读 326评论 2 9