版本:
kafka_2.12-2.2.0.tgz
1.进入kafka_2.12-2.2.0.tgz 所在的目录,解压移动
1).解压移动:
tar -zxvf kafka_2.12-2.2.0.tgz
sudo mv kafka_2.12-2.2.0 /usr/local/kafka
2).配置环境:
export JAVA_HOME=/usr/local/java
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_COMMON_LIB_NATIVE_DIR"
export HIVE_HOME=/usr/local/hive
export HIVE_CONF_DIR=${HIVE_HOME}/conf
export HCAT_HOME=$HIVE_HOME/hcatalog
export HIVE_DEPENDENCY=/usr/local/hive/conf:/usr/local/hive/lib/*:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-pig-adapter-2.3.5.jar:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-2.3.5.jar:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-server-extensions-2.3.5.jar:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-2.3.5.jar:/usr/local/hive/lib/hive-exec-2.3.5.jar
export ZOOKEEPER_HOME=/usr/local/zookeeper/
export KAFKA_HOME=/usr/local/kafka
export HBASE_HOME=/usr/local/hbase
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$HBASE_HOME/bin:$ZOOKEEPER_HOME:$KAFKA_HOME
export SCALA_HOME=/usr/local/scala
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:$PATH
export SPARK_HOME=/usr/local/spark
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:$PATH
export KYLIN_HOME=/usr/local/kylin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$HBASE_HOME/bin:$ZOOKEEPER_HOME:$KAFKA_HOME:$KYLIN_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:${HIVE_HOME}/lib:$HBASE_HOME/lib:$KYLIN_HOME/lib
2.配置kafka/config中的server.properties
#清空命令
echo >server.properties
写入如下内容:
broker.id=0
listeners=PLAINTEXT://192.168.1.21:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.21:2181/kafka #有的教程是没有/kafka的,所以后面的命令也没有,这个是代表在zookeeper里创建一个kafka文件夹,包含kafka启动创建topics等文件
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enble=true
delete.topic.enble=true -----如果不指定这个参数,执行删除操作只是标记删除
3.启动kafka:
cd /usr/local/kafka
nohup bin/kafka-server-start.sh config/server.properties&
#查看nohup文件有没有错误信息,没错就没问题。
4.创建和查看topics
#创建一个名为first的topic
bin/kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic first
#查看所有topic
bin/kafka-topics.sh --zookeeper master:2181/kafka --list
5.查看topic的详细信息
#单个topic
bin/kafka-topics.sh --describe --zookeeper master:2181/kafka --topic first
#全部topic
bin/kafka-topics.sh --describe --zookeeper master:2181/kafka
6.生产数据
bin/kafka-console-producer.sh --broker-list master:9092 --topic first
7.消费数据
注意:--zookeeper是一个过时的操作,最新版使用--bootstrap-server
#查看单消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first
#查看全部消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic first
8.查看topic消费到的offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list master:9092 --topic first
#运行结果
irst
first:0:3
9.查看topic各个分区的消息的消息
我的主机报错--错误: 找不到或无法加载主类 kafka.tools.ConsumerOffsetChecker,有空再解决
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic first --zookeeper master:2181/kafka
#运行结果:
GROUP TOPIC PID OFFSET LOGSIZE LAG
消费者组 topic 名字 partition id 当前已消费的条数 总条数 未消费的条数
修改topic的分区partition数量(只能增加不能减少)
bin/kafka-topics.sh --alter --zookeeper master:2181/kafka --partitions 5 --topic first
10.修改 topic 的副本数
自己写一个文件 addReplicas.json,文件的内容是 JSON 格式的,比如:
{
"version": 1,
"partitions": [
{
"topic": "test0",
"partition": 0,
"replicas": [
1,2
]
},
{
"topic": "test0",
"partition": 1,
"replicas": [
1,2,3
]
},
{
"topic": "test0",
"partition": 2,
"replicas": [
1,2,3
]
}
]
}
11.执行修改命令
bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181--reassignment-json-file addReplicas.json --execute
下线 broker,主要是为了维护运维,不过我是单机,没必要
12.删除topic
bin/kafka-topics.sh --delete --zookeeper master:2181/kafka --topic first
1.如果 server.properties 中没有把 delete.topic.enable 设为 true,那么此时的删除并不是真正的删除,而是把 topic 标记为:marked for deletion
2,删除 kafka 中该 topic 相关的目录。
在 server.properties 中找到配置 log.dirs,把该目录下 first 相关的目录删掉
3,登录 zookeeper client。
命令:
/usr/local/zookeeper/bin/zkCli.sh
4,删除 zookeeper 中该 topic 相关的目录
命令:
rm -r /kafka/config/topics/ first
rm -r /kafka/brokers/topics/first
rm -r /kafka/admin/delete_topics/ first(topic 被标记为 marked for deletion 时需要这个命令)
5,重启 zookeeper 和 broker
13.kafka 可视化客户端工具(Kafka Tool)安装使用
下载地址:http://www.kafkatool.com/download.html
如果是windows下安装kafkatool需要
修改 C:\Windows\System32\drivers\etc\hosts
如果hosts文件没有权限先复制到桌面修改再粘贴覆盖回去
增加内容如下:
#kafka集群的ip以及hostname
192.168.1.31 master
14.python连接kafka
1、生产者:
from kafka import KafkaProducer
producer = KafkaProducer (bootstrap_servers=['172.21.10.136:9092']) #此处 ip 可以是多个 ['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
msg = "msg%d" % i
producer.send('test', msg)
producer.close()
2、消费者 (简单 demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
启动后生产者、消费者可以正常消费。
3、消费者 (消费群组)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力
4、消费者 (读取目前最早可读的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset: 重置偏移量,earliest 移到最早的可用消息,latest 最新的消息,默认为 latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}
5、消费者 (手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic ("test") #获取 test 主题的分区信息
print consumer.topics () #获取主题列表
print consumer.subscription () #获取当前消费者订阅的主题
print consumer.assignment () #获取当前消费者 topic、分区信息
print consumer.beginning_offsets (consumer.assignment ()) #获取当前消费者可消费的偏移量
consumer.seek (TopicPartition (topic=u'test', partition=0), 5) #重置偏移量,从第 5 个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
6、消费者 (订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe (topics=('test','test0')) #订阅要消费的主题
print consumer.topics()
print consumer.position (TopicPartition (topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
7、消费者 (手动拉取消息)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll (timeout_ms=5) #从 kafka 获取消息
print msg
time.sleep(1)
8、消费者 (消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print num
print consumer.paused () #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume..."
consumer.resume(TopicPartition(topic=u'test', partition=0))
print "resume......"
pause 执行后,consumer 不能读取,直到调用 resume 后恢复。