kafka安装

kafka

  • 集群安装

#解压,创建软链接
cd /home/hadoop/app/
tar -zxvf kafka_2.11-0.9.0.1.tgz
ln -s kafka_2.11-0.9.0.1 kafka
rm -rf kafka_2.11-0.9.0.1.tgz            #删除安装文件,节省空间


#修改配置文件
cd kafka/config/
vim server.properties

broker.id=1            #每个机器上不同,如(broker.id=2, broker.id=3)
log.dirs=/export/servers/log/kafka                 #是kafka的日志文件目录,存放的是消息数据,以主题命名的分区
zookeeper.connect=zk03:2181,zk02:2181,zk01:2181

#################################################

#分发到其他机器
mkdir /export/servers/log/kafka -p
scp -rp ./kafka hdp-node-02:/home/hadoop/app/
scp -rp ./kafka hdp-node-03:/home/hadoop/app/


#启动
#依次在各节点上启动kafka
bin/kafka-server-start.sh  config/server.properties

#or

/usr/hadoop/kafka_2.11-0.10.2.0/bin/kafka-server-start.sh -daemon /usr/hadoop/kafka_2.11-0.10.2.0/config/server.properties

#后台启动
kafka-server-start.sh config/server.properties &
  • Kafka常用操作命令

查看当前服务器中的所有topic
             bin/kafka-topics.sh --list --zookeeper zk01:2181

创建topic 
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 3 --topic first

删除topic 
    bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test 
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

通过shell命令发送消息 
    kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima

通过shell消费消息
    bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1 
写--from-beginning会显示历史消息,如果只想显示最新的可以不写

查看消费位置  
    kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

查看某个Topic的详情 
    kafka-topics.sh --topic test --describe --zookeeper zk01:2181

停止服务 
    bin/kafka-server-stop.sh 
添加分区
    bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3 sh bin./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest 
查看topic的偏移
    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic kafkatest2 --time -1 sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic kafkatest2 --time -2
用ConsumerGroupCommand工具,我们可以使用list,describe,或delete消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。 例如,要列出所有主题中的所有用户组:

查看所有的消费者组
    bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list

    Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
        monitorwebsitegroup
        syslog
        ntaflowgroup
        aptgroup
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容