1.kafka是啥?
A distributed streaming platform
-- kafka官网
翻译翻译,说kafka是 分布式流媒体平台,流媒体平台有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息传递系统。(消息队列)
- 以容错的持久方式存储记录流。(持久化)
- 处理发生的记录流。(流处理)
2.配置
主要有两个文件需要安排:server.properties 和 zookeeper.properties
要先修改一下日志的目录,因为默认是在tmp目录下,tmp目录里的内容会丢失,具体原因是重启后内容会被删除。
server.properties:
原本: log.dirs=/tmp/kafka-logs
修改为:log.dirs=自定义目录
zookeeper.properties:
原本: dataDir=/tmp/zookeeper
修改为:dataDir=/usr/local/zookeeper/data
上述是kafka自带的配置文件,我们也可以通过配置server.properties去指定使用其他服务器上的zookeeper
但是!!!
server.properties中有一个关于zookeeper的部分
############################ Zookeeper ##############################
Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.zookeeper.connect=zookeeper所在的ip地址:端口(默认2181)
# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000
3.启动
设置好的话,就不需要去启动kafka自带的zk了。
这里说一下如果其他没有配置
需要先启动自带的zookeeper
位置是 : bin/zookeeper-server-start.sh config/zookeeper.properties
这里注意一点!kafka生命周期里必须要有zookeep的存在!!!
kafka开启前,要先开启zk
kafka关闭后,才允许关闭zk
kafka启动命令:
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
解释一下:
1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,不输出到控制台 其中/dev/null 是空设备。
这样就打开了kafka,接下来可以 用flume去搭配使用kafka了,具体是修改flume的配置文件。
当然我们可以给kafka开启主题Topic,具体类似RabbitMQ里面设定路由规则。
比如说:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
上面这句就开启了一个test主题。我们可以向主题发送消息!
4.发送消息
启动一个生产者!这里可以说一下,kafka自带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
首先启动生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
然后发送一些信息:
> hello world
> hello???hello?!Oh!!!Fine~
5.接受消息
启动一个消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
这样就可以直接接到信息了!