大数据单机部署-kafka

版本:

kafka_2.12-2.2.0.tgz

1.进入kafka_2.12-2.2.0.tgz 所在的目录,解压移动

1).解压移动:

tar -zxvf kafka_2.12-2.2.0.tgz
sudo mv kafka_2.12-2.2.0  /usr/local/kafka

2).配置环境:

export JAVA_HOME=/usr/local/java
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_COMMON_LIB_NATIVE_DIR"
export HIVE_HOME=/usr/local/hive
export HIVE_CONF_DIR=${HIVE_HOME}/conf
export HCAT_HOME=$HIVE_HOME/hcatalog
export HIVE_DEPENDENCY=/usr/local/hive/conf:/usr/local/hive/lib/*:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-pig-adapter-2.3.5.jar:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-2.3.5.jar:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-server-extensions-2.3.5.jar:/usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-2.3.5.jar:/usr/local/hive/lib/hive-exec-2.3.5.jar

export ZOOKEEPER_HOME=/usr/local/zookeeper/
export KAFKA_HOME=/usr/local/kafka
export HBASE_HOME=/usr/local/hbase
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$HBASE_HOME/bin:$ZOOKEEPER_HOME:$KAFKA_HOME
export SCALA_HOME=/usr/local/scala
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:$PATH

export SPARK_HOME=/usr/local/spark
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:$PATH

export KYLIN_HOME=/usr/local/kylin
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HCAT_HOME/bin:$HBASE_HOME/bin:$ZOOKEEPER_HOME:$KAFKA_HOME:$KYLIN_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:${HIVE_HOME}/lib:$HBASE_HOME/lib:$KYLIN_HOME/lib

2.配置kafka/config中的server.properties

#清空命令
echo >server.properties

写入如下内容:

broker.id=0
listeners=PLAINTEXT://192.168.1.21:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.21:2181/kafka      #有的教程是没有/kafka的,所以后面的命令也没有,这个是代表在zookeeper里创建一个kafka文件夹,包含kafka启动创建topics等文件
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enble=true           

delete.topic.enble=true -----如果不指定这个参数,执行删除操作只是标记删除

3.启动kafka:

cd /usr/local/kafka
nohup bin/kafka-server-start.sh config/server.properties&
#查看nohup文件有没有错误信息,没错就没问题。

4.创建和查看topics

#创建一个名为first的topic
bin/kafka-topics.sh --zookeeper master:2181/kafka  --create --replication-factor 1 --partitions 1 --topic first
#查看所有topic
bin/kafka-topics.sh --zookeeper master:2181/kafka --list

5.查看topic的详细信息

#单个topic
bin/kafka-topics.sh --describe --zookeeper master:2181/kafka --topic first
#全部topic
bin/kafka-topics.sh --describe --zookeeper master:2181/kafka

6.生产数据

bin/kafka-console-producer.sh --broker-list master:9092 --topic first

7.消费数据

注意:--zookeeper是一个过时的操作,最新版使用--bootstrap-server

#查看单消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first
#查看全部消息
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic first

8.查看topic消费到的offset

bin/kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list master:9092 --topic first
#运行结果
irst
first:0:3

9.查看topic各个分区的消息的消息

我的主机报错--错误: 找不到或无法加载主类 kafka.tools.ConsumerOffsetChecker,有空再解决

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic first --zookeeper master:2181/kafka

#运行结果:
GROUP     TOPIC      PID                 OFFSET                   LOGSIZE   LAG
消费者组   topic 名字  partition id    当前已消费的条数   总条数        未消费的条数

修改topic的分区partition数量(只能增加不能减少)

bin/kafka-topics.sh --alter --zookeeper master:2181/kafka --partitions 5 --topic first

10.修改 topic 的副本数

自己写一个文件 addReplicas.json,文件的内容是 JSON 格式的,比如:

{
   "version": 1,
   "partitions": [
       {
           "topic": "test0",
           "partition": 0,
           "replicas": [
                1,2
           ]
       },
       {
           "topic": "test0",
           "partition": 1,
           "replicas": [
                1,2,3
           ]
       },
       {
           "topic": "test0",
           "partition": 2,
           "replicas": [
                1,2,3
           ]
       }
    ]
}

11.执行修改命令

bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181--reassignment-json-file addReplicas.json --execute

下线 broker,主要是为了维护运维,不过我是单机,没必要

12.删除topic

bin/kafka-topics.sh --delete --zookeeper master:2181/kafka --topic first


1.如果 server.properties 中没有把 delete.topic.enable 设为 true,那么此时的删除并不是真正的删除,而是把 topic 标记为:marked for deletion
2,删除 kafka 中该 topic 相关的目录。
在 server.properties 中找到配置 log.dirs,把该目录下  first 相关的目录删掉
3,登录 zookeeper client。
命令:
/usr/local/zookeeper/bin/zkCli.sh
4,删除 zookeeper 中该 topic 相关的目录
命令:
rm -r /kafka/config/topics/ first
rm -r /kafka/brokers/topics/first
rm -r /kafka/admin/delete_topics/ first(topic 被标记为 marked for deletion 时需要这个命令)
5,重启 zookeeper 和 broker

13.kafka 可视化客户端工具(Kafka Tool)安装使用

下载地址:http://www.kafkatool.com/download.html

如果是windows下安装kafkatool需要
修改 C:\Windows\System32\drivers\etc\hosts
如果hosts文件没有权限先复制到桌面修改再粘贴覆盖回去
增加内容如下:

#kafka集群的ip以及hostname
192.168.1.31    master

14.python连接kafka

1、生产者:
from kafka import KafkaProducer
producer = KafkaProducer (bootstrap_servers=['172.21.10.136:9092'])  #此处 ip 可以是多个 ['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()



2、消费者 (简单 demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

启动后生产者、消费者可以正常消费。


3、消费者 (消费群组)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
                                          
启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力


4、消费者 (读取目前最早可读的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
                         auto_offset_reset='earliest',
                         bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
                                         
auto_offset_reset: 重置偏移量,earliest 移到最早的可用消息,latest 最新的消息,默认为 latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}


5、消费者 (手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
                         bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic ("test")  #获取 test 主题的分区信息
print consumer.topics ()  #获取主题列表
print consumer.subscription ()  #获取当前消费者订阅的主题
print consumer.assignment ()  #获取当前消费者 topic、分区信息
print consumer.beginning_offsets (consumer.assignment ()) #获取当前消费者可消费的偏移量
consumer.seek (TopicPartition (topic=u'test', partition=0), 5)  #重置偏移量,从第 5 个偏移量消费
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))


   
6、消费者 (订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe (topics=('test','test0'))  #订阅要消费的主题
print consumer.topics()
print consumer.position (TopicPartition (topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))
                                          
7、消费者 (手动拉取消息)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll (timeout_ms=5)   #从 kafka 获取消息
    print msg
    time.sleep(1)

8、消费者 (消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused ()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"
        
pause 执行后,consumer 不能读取,直到调用 resume 后恢复。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342