使用php连接操作kafka,从安装kafka到引入php扩展来操作kafka。
一、安装
注:需安装JDK
1.官网下载kafka
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
2.解压文件
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
3.启动ZooKeeper(注意:kafka包自带了zookeeper)
bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动kafka
bin/kafka-server-start.sh config/server.properties
5.创建一个topic(名为test)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可通过这个命令查看现有的topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
6.发送一些消息:生产者producer(利用kafka自带客户端)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
7.接收消息:消费者consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
到此,kafka单机版就安装完成了。
二、使用php连接kafka
(一)让kafka支持外网访问
1.修改config/下的server.properties
listeners=PLAINTEXT://192.168.230.130:9092
host.name=192.168.230.130 #这里的ip填成你的kafka的ip
继续修改
advertised.listeners=PLAINTEXT://192.168.230.130:9092
2.至此,配置完成,记得重启kafka服务,可以下载kafka tools来验证是否可以外网访问
注:配置完以后,操作脚本时记得改为--bootstrap-server 192.168.230.130:9092,否则连不上。
(二)集成kafka-php,支持连接kafka
1.使用的是https://github.com/weiboad/kafka-php/来连接kafka,优点是不需要安装
php拓展。使用composer安装
2.按照github给的示例代码连接即可,需要注意的是
$config->setMetadataBrokerList('192.168.230.130:9092');
这里填写的是kafka地址,不是zookeeper的。
3.offset可以这样设置
$config->setOffsetReset('earliest');
(三)遇到的坑
github给的consumer建议通过脚本运行,当脚本被kill掉以后,再次重启脚本时,发现消费的数据漏掉了一条。后来通过这样解决了问题:
首先,先关闭consumer脚本
记得脚本里这样设置
$config->setOffsetReset('earliest');
通过这个命令可以查看这个groupid当前的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.230.130:9092 --describe --group test3
通过这个命令,将这个groupid的偏移量设置成最早的
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.230.130:9092 --reset-offsets --group test3 --topic test --to-earliest
再重新启动脚本,再消费数据时,就是正常的了,不会出现丢数据的情况;但是,在生产者持续写入时,还是有bug
后来发现是协议版本问题,换成这个就解决了:
$config->setBrokerVersion('1.0.0');