kafka

写在开头

仅用于自己学习记录,本章节学习目的是快速上手kafka和体验go操作kafka

目录

文章目录

1.消息队列

消息队列MQ(Message Queue)也被称为中间件,不存储消息内容,只是消息的搬运工,具体表现在:

  • 在不同进程间传递消息
  • 在统一进程的不同线程间传递消息

消息队列的基本形态,就是有N个生产者,N个消费者
image.png

在该模式下,生产者只需要向消息队列投递消息,生产者只需要等消息队列搬运消息,此时,生产者和消费者就解耦了

2.Kafka

kafka是一个分布式,支持多分区,基于zookeeper的分布式消息流平台(元数据都保存在zookeeper中,因此3.7版本之前都需要先安装zookeeper)它同时也是一款开源的基于发布订阅模式的消息引擎系统

为什么要学习kafka,对于数据密集型应用来说,kafka能很好帮助我们应对数据量的激增,举个例子,上游比如是300个示例的大型数据中心,下游是一个搜索和查询的引擎,中间件使用kafka隔离上下游业务,将上游激增的流量缓存起来,以平滑的方式传到到下游子系统中,避免了流量的不规则冲击。

2.1消息引擎系统

看名字就知道,它比MQ逼格更高,wiki上的介绍是

消息引擎是一组规范,企业利用这组规范在不同系统之间传递准确的消息,实现松耦合的异步式数据传递

即:

  1. 用于不同系统之间
  2. 传输的对象是消息

这么一看是不是和MQ大差不差,但是之所以把他叫做引擎,是它能把消息转换成一定的格式,即如何传输消息,如何设计待传输消息的格式都属于消息引擎设计的一部分(摩托车引擎把燃油转为动能,消息引擎也是如此,所以才叫引擎)。
实际上kafka在传输时使用的是纯二进制的字节序列

2.2为什么使用kafka

在这章开头举了如何对抗峰值流量例子,就是削峰填谷,缓冲上下游突发的流量,使其平滑,来保护下游服务

2.3Kafka术语

极客时间中有趣的解释

image.png

  • 消息:record,指kafka处理的主要对象,类比就是数据库表中的一行记录

  • 生产者/消费者:指发布/消费消息的应用程序

  • 主题:Topic,承载消息的容器,类比就是数据库中的表,更直观点解释就是一个业务就是一个topic
    image.png
  • 分区:一个有序不变的消息序列,每个主题下可以有多个分区
    image.png
  • 消息位移offset:分区中每条消息的位置

  • 副本replica:Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本,副本分为领导者副本和追随者副本,生产者只与领导者副本交互

  • 消费者组:多个消费者实例组成一个组,同时消费多个消息以实现提高吞吐量(如果一个 topic 有 N 个分区,那么同一个消费组最多有 N 个消费者。多于这个数字的消
    费者会被忽略。)

  • 消费者位移:表示消费者消费进度,每个消费者都有自己的消费者位移

  • 重平衡:组内某个消费者挂了,其他实例自动重新分配订阅主题分区的过程

2.4集群配置参数

2.4.1Broker端参数

Broker需要配置存储信息,即Broker使用哪些磁盘,针对存储信息的重要参数有以下几个:

  • log.dirs:指定Broker需要使用的若干个文件目录路径,没有默认值必须手动指定
  • log.dir:补充上一个参数
    实际只需要配置log.dirs即可,线上生产环境一定要配置多个路径(提升读写性能,实现故障转移),采用CSV格式(用逗号分隔多个路径,如/home/kafka1,/home/kafka2,/home/kafka3)

与zooKeeper相关设置:

  • zookeeper.connect :zooKeeper集群连接,采用csv格式(zk1:2181,zk2:2181,zk3:2181)

Broker连接相关(客户端连接或与其他broker连接)

  • listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
  • advertised.listeners 这组监听器是 Broker 用于对外发布的,即外网访问
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了

Topic相关

  • auto.create.topics.enable:是否允许自动创建 Topic ,推荐设置false
  • unclean.leader.election.enable:是否允许 Unclean Leader 选举,建议设置成false ,坚决不能让那些落后太多的副本竞选 Leader
  • auto.leader.rebalance.enable是否允许定期进行 Leader 选举,推荐设置成false,在生产环境中换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

数据保留

  • log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间
  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小,默认值-1,表示保存多少数据都可以
  • message.max.bytes:控制 Broker 能够接收的最大消息大小

2.4.1 Topic级别参数

Kafka支持为不同的topic设置不同的参数值,Topic级别参数会覆盖全局broker参数

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间

如何设置topic级别参数?

  1. 创建时设置( Kafka 开放了kafka-topics命令供我们来创建 Topic,--config用于设置topic级别参数 )
  2. 修改时设置(更推荐使用该种)

3.快速上手kafka

我用的3.7.0 不需要额外安装zookeeper
参考Docker---apache/kafka

sudo docker pull apache/kafka:3.7.0

sudo docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0

但是这种方式有一个弊端,我的kafka是安装在云服务器上的,本地的windows上无法访问!!这时我们想到可能是上面的参数在作祟

advertised.listeners

我们进入容器查看

sudo docker  exec -it kafka /bin/bash

cd opt/kafka/config

cat server.properties | grep listeners

发现advertised.listeners的值为localhost:9092,只允许本地访问,我们需要将他修改成以下形式

//我在云服务器上,这个ip就是我云服务器的弹性公网ip
PLAINTEXT://ip:9092

但是很遗憾,在docker里该文件是只读,我们也没root权限,那么是否启动时修改配置参数就行了,可以,但很麻烦,根据kafka的docker介绍

Apache Kafka 支持多种代理配置,您可以通过环境变量覆盖这些配置。环境变量必须以 开头KAFKA_,代理配置中的任何点都应在相应的环境变量中指定为下划线。
需要注意的是,如果您要覆盖任何配置,则不会使用任何默认配置。

没错你不能光写一个

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localxxxxxx:9092 

还要连其他的一起补充

docker run -d  \
  --name broker \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  apache/kafka:latest

学习成本蹭蹭上去了,所以我选择直接复制一个配置文件给他启动
将kafka里的配置拷贝出一份

mkdir -p config
sudo docker  kafka:/opt/kafka/config/server.properties ./config

cd config
vim server.properties //修改成你的弹性公网ip

vim Dockerfile //写入下面两行
FROM apache/kafka:3.7.0
COPY server.properties /etc/kafka/docker

//构建kafka
sudo docker build -t="mykafka:1.0.0" .

//停止并删除之前的容器
sudo docker stop kafka
sudo docker rm kafka

//启动自己封装的镜像
sudo docker run -d -p 9092:9092 --name kafka mykafka:1.0.0

添加topic

//进入容器
sudo docker exec -it kafka /bin/bash
cd opt/kafka/bin

./kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic tests 

检查topic

./kafka-topics.sh --list --bootstrap-server localhost:9092 

3.1go连接kafka

选用sarama 因为用户多,注意现在文件移动到了IBM

go get -u github.com/IBM/sarama

下载消费者模拟工具模拟消费者消费消息

go install github.com/IBM/sarama/tools/kafka-console-consumer@latest

启动成功表示已经能成功连接远程kafka

 kafka-console-consumer -topic tests -brokers ip地址:9092

编写生产者

func TestProducer(t *testing.T) {
    cfg := sarama.NewConfig()
    cfg.Producer.Return.Successes = true
    cfg.Producer.Return.Errors = true
    cfg.Version = sarama.MaxVersion

    borkers := []string{"xxxxxxxx:9092"}

    producer, err := sarama.NewAsyncProducer(borkers, cfg)
    assert.NoError(t, err)
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "tests",
        Value: sarama.StringEncoder("hello"),
    }

    producer.Input() <- msg

    select {
    case success := <-producer.Successes():
        t.Log(success.Partition, success.Offset)
                return
    case err := <-producer.Errors():
        t.Log("发送失败", err)
    }

}

消费端输出如下

Partition:      0
Offset: 2    
Key:
Value:  hello

生产者输出如下:

0,2
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容