之前在做广告系统和小说项目中都使用kafka来作为mq的组件, 今天来简单记录下kafka
先看看Kafka能干什么
- 消息系统
- 存储系统
概念
- topic: 数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据
首先作为消息系统
- 传统消息系统包含: '队列' 和 '发布-订阅' 两个模块, 消费者池从server读取数据,每条记录被池子中的一个消费者消费; 在发布订阅中,记录被广播到所有的消费者。两者均有优缺点。
- 队列的优点在于它允许你将处理数据的过程分给多个消费者实例,使你可以扩展处理过程。
- 不好的是,队列不是多订阅者模式的—一旦一个进程读取了数据,数据就会被丢弃。
- 而发布-订阅系统允许你广播数据到多个进程,但是无法进行扩展处理,因为每条消息都会发送给所有的订阅者。
- Kafka 在作为消息系统的优势在于
- 每个topic都可以扩展处理并且允许多订阅者模式—不需要只选择其中一个.
- 相比于传统消息队列还具有更严格的顺序保证
- topic中的partition是一个并行的概念。 Kafka能够为一个消费者池提供顺序保证和负载平衡,是通过将topic中的partition分配给消费者组中的消费者来实现的, 以便每个分区由消费组中的一个消费者消耗。通过这样,我们能够确保消费者是该分区的唯一读者,并按顺序消费数据。 众多分区保证了多个消费者实例间的负载均衡。但请注意,消费者组中的消费者实例个数不能超过分区的数量
作为存储系统
- 数据写入Kafka后被写到磁盘,并且进行备份以便容错
- Kafka使用磁盘结构,具有很好的扩展性—50kb和50TB的数据在server上表现一致。
- 可认为Kafka是一种高性能、低延迟、具备日志存储、备份和传播功能的分布式文件系统
Kafka可以使用在这些方面
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
具体表现
- 消息
- 跟踪网站活动
- 度量
- 日志聚合
- 流处理
- 采集日志
针对Kafka更加详细的信息, 可以去Kafka中文文档 查看
zookeeper安装
由于kafka需要用到zookeeper, 在这里我选择自己安装zookeeper, 当然,也可以使用kafka自带的zookeeper
下载
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5.tar.gz
解压zookeeper
tar -zxvf apache-zookeeper-3.5.5.tar.gz
复制conf/zoo_sample.cfg 命名为zoo.cfg
cd conf
cp zoo_sample.cfg zoo.cfg
启动zookeeper
cd bin
zkServer.sh start
Kafka的安装过程
下载
wget http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解压并重新命名
tar -zxvf kafka_2.11-1.0.0.tgz
mv kafka_2.11-1.0.0 kafka
添加环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile # 使环境变量生效
server.properties配置
broker.id=1 # 标识 集群需要更改
host.name=sanq1.com.cn # sanq1.com.cn是在host中配置的ip映射
listeners=PLAINTEXT://sanq1.com.cn:9092 # 监听器列表 0.0.0.0监听到任意ip 如果没有设置advertised.listeners 那么listeners一定不能设置0.0.0.0.0
advertised.listeners=PLAINTEXT://sanq1.com.cn:9092 # 监听器发布到ZooKeeper供客户端使用的地址 不能使用0.0.0.0
log.dirs=/usr/local/opt/kafka/kaf_logs # 日志目录, 方便出错查看信息, 集群必须
zookeeper.connect=sanq1.com.cn:2181 # 如果zookeeper集群的话 用 , 分割
num.partitions=1 # 默认分片为1 一般用于集群, 单机可不设置
启动
必须先启动zookeeper, 然后在启动kafka
# 启动kafka
nohup kafka-server-start.sh /usr/local/opt/kafka/config/server.properties >/dev/null 2>&1 &
说明:
让kafka在后台启动并且不会因为关闭客户端(xshell) 而停止kafka
到这里kafka就已经安装完成并且已经启动完成, 我们来简单试验一下
简单命令
- 创建topic, 创建一个名为“topic_test”的topic,它有一个分区和一个副本:
kafka-topics.sh --create --zookeeper sanq1.com.cn:2181 --replication-factor 1 --partitions 1 --topic topic_test
- 查看topic的列表
kafka-topics.sh --list --zookeeper sanq1.com.cn:2181
- 使用kafka自带的命令行客户端向topic_test发送消息
kafka-console-producer.sh --broker-list sanq1.com.cn:9092 --topic topic_test
- 启动一个consumer(消费者) 接收消息
# from-beginning 从第一行接收
kafka-console-consumer.sh --bootstrap-server sanq1.com.cn:9092 --from-beginning --topic topic_test
到此有关kafka的简介就全部完成。