写在开头
仅用于自己学习记录,本章节学习目的是快速上手kafka和体验go操作kafka
目录
1.消息队列
消息队列MQ(Message Queue)也被称为中间件,不存储消息内容,只是消息的搬运工,具体表现在:
- 在不同进程间传递消息
- 在统一进程的不同线程间传递消息
在该模式下,生产者只需要向消息队列投递消息,生产者只需要等消息队列搬运消息,此时,生产者和消费者就解耦了
2.Kafka
kafka是一个分布式,支持多分区,基于zookeeper的分布式消息流平台(元数据都保存在zookeeper中,因此3.7版本之前都需要先安装zookeeper)它同时也是一款开源的基于发布订阅模式的消息引擎系统。
为什么要学习kafka,对于数据密集型应用来说,kafka能很好帮助我们应对数据量的激增,举个例子,上游比如是300个示例的大型数据中心,下游是一个搜索和查询的引擎,中间件使用kafka隔离上下游业务,将上游激增的流量缓存起来,以平滑的方式传到到下游子系统中,避免了流量的不规则冲击。
2.1消息引擎系统
看名字就知道,它比MQ逼格更高,wiki上的介绍是
消息引擎是一组规范,企业利用这组规范在不同系统之间传递准确的消息,实现松耦合的异步式数据传递
即:
- 用于不同系统之间
- 传输的对象是消息
这么一看是不是和MQ大差不差,但是之所以把他叫做引擎,是它能把消息转换成一定的格式,即如何传输消息,如何设计待传输消息的格式都属于消息引擎设计的一部分(摩托车引擎把燃油转为动能,消息引擎也是如此,所以才叫引擎)。
实际上kafka在传输时使用的是纯二进制的字节序列
2.2为什么使用kafka
在这章开头举了如何对抗峰值流量例子,就是削峰填谷,缓冲上下游突发的流量,使其平滑,来保护下游服务
2.3Kafka术语
消息:record,指kafka处理的主要对象,类比就是数据库表中的一行记录
生产者/消费者:指发布/消费消息的应用程序
-
主题:Topic,承载消息的容器,类比就是数据库中的表,更直观点解释就是一个业务就是一个topic
-
分区:一个有序不变的消息序列,每个主题下可以有多个分区
消息位移offset:分区中每条消息的位置
副本replica:Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本,副本分为领导者副本和追随者副本,生产者只与领导者副本交互
消费者组:多个消费者实例组成一个组,同时消费多个消息以实现提高吞吐量(如果一个 topic 有 N 个分区,那么同一个消费组最多有 N 个消费者。多于这个数字的消
费者会被忽略。)消费者位移:表示消费者消费进度,每个消费者都有自己的消费者位移
重平衡:组内某个消费者挂了,其他实例自动重新分配订阅主题分区的过程
2.4集群配置参数
2.4.1Broker端参数
Broker需要配置存储信息,即Broker使用哪些磁盘,针对存储信息的重要参数有以下几个:
- log.dirs:指定Broker需要使用的若干个文件目录路径,没有默认值必须手动指定
- log.dir:补充上一个参数
实际只需要配置log.dirs即可,线上生产环境一定要配置多个路径(提升读写性能,实现故障转移),采用CSV格式(用逗号分隔多个路径,如/home/kafka1,/home/kafka2,/home/kafka3)
与zooKeeper相关设置:
- zookeeper.connect :zooKeeper集群连接,采用csv格式(zk1:2181,zk2:2181,zk3:2181)
Broker连接相关(客户端连接或与其他broker连接)
- listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务
- advertised.listeners 这组监听器是 Broker 用于对外发布的,即外网访问
- host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了
Topic相关
- auto.create.topics.enable:是否允许自动创建 Topic ,推荐设置false
- unclean.leader.election.enable:是否允许 Unclean Leader 选举,建议设置成false ,坚决不能让那些落后太多的副本竞选 Leader
- auto.leader.rebalance.enable是否允许定期进行 Leader 选举,推荐设置成false,在生产环境中换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。
数据保留
- log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间
- log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小,默认值-1,表示保存多少数据都可以
- message.max.bytes:控制 Broker 能够接收的最大消息大小
2.4.1 Topic级别参数
Kafka支持为不同的topic设置不同的参数值,Topic级别参数会覆盖全局broker参数
- retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天
- retention.bytes:规定了要为该 Topic 预留多大的磁盘空间
如何设置topic级别参数?
- 创建时设置( Kafka 开放了kafka-topics命令供我们来创建 Topic,--config用于设置topic级别参数 )
- 修改时设置(更推荐使用该种)
3.快速上手kafka
我用的3.7.0 不需要额外安装zookeeper
参考Docker---apache/kafka
sudo docker pull apache/kafka:3.7.0
sudo docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0
但是这种方式有一个弊端,我的kafka是安装在云服务器上的,本地的windows上无法访问!!这时我们想到可能是上面的参数在作祟
advertised.listeners
我们进入容器查看
sudo docker exec -it kafka /bin/bash
cd opt/kafka/config
cat server.properties | grep listeners
发现advertised.listeners的值为localhost:9092,只允许本地访问,我们需要将他修改成以下形式
//我在云服务器上,这个ip就是我云服务器的弹性公网ip
PLAINTEXT://ip:9092
但是很遗憾,在docker里该文件是只读,我们也没root权限,那么是否启动时修改配置参数就行了,可以,但很麻烦,根据kafka的docker介绍
Apache Kafka 支持多种代理配置,您可以通过环境变量覆盖这些配置。环境变量必须以 开头KAFKA_,代理配置中的任何点都应在相应的环境变量中指定为下划线。
需要注意的是,如果您要覆盖任何配置,则不会使用任何默认配置。
没错你不能光写一个
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localxxxxxx:9092
还要连其他的一起补充
docker run -d \
--name broker \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
apache/kafka:latest
学习成本蹭蹭上去了,所以我选择直接复制一个配置文件给他启动
将kafka里的配置拷贝出一份
mkdir -p config
sudo docker kafka:/opt/kafka/config/server.properties ./config
cd config
vim server.properties //修改成你的弹性公网ip
vim Dockerfile //写入下面两行
FROM apache/kafka:3.7.0
COPY server.properties /etc/kafka/docker
//构建kafka
sudo docker build -t="mykafka:1.0.0" .
//停止并删除之前的容器
sudo docker stop kafka
sudo docker rm kafka
//启动自己封装的镜像
sudo docker run -d -p 9092:9092 --name kafka mykafka:1.0.0
添加topic
//进入容器
sudo docker exec -it kafka /bin/bash
cd opt/kafka/bin
./kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic tests
检查topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
3.1go连接kafka
选用sarama 因为用户多,注意现在文件移动到了IBM
go get -u github.com/IBM/sarama
下载消费者模拟工具模拟消费者消费消息
go install github.com/IBM/sarama/tools/kafka-console-consumer@latest
启动成功表示已经能成功连接远程kafka
kafka-console-consumer -topic tests -brokers ip地址:9092
编写生产者
func TestProducer(t *testing.T) {
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Version = sarama.MaxVersion
borkers := []string{"xxxxxxxx:9092"}
producer, err := sarama.NewAsyncProducer(borkers, cfg)
assert.NoError(t, err)
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "tests",
Value: sarama.StringEncoder("hello"),
}
producer.Input() <- msg
select {
case success := <-producer.Successes():
t.Log(success.Partition, success.Offset)
return
case err := <-producer.Errors():
t.Log("发送失败", err)
}
}
消费端输出如下
Partition: 0
Offset: 2
Key:
Value: hello
生产者输出如下:
0,2