PHP安装使用kafka

1.安装java环境并配置环境变量
2.下载kafka安装包并解压
3.安装librdkafka库

git clone https://github.com/edenhill/librdkafka.git
 ./configure
 make
 sudo make install

4.安装php-rdkafka扩展

$ git clone https://github.com/arnaud-lb/php-rdkafka.git
 
#生成configure文件
$ /Users/shiyibo/LNMP/php/bin/phpize 
 
#编译安装
$ ./configure --with-php-config=/Users/shiyibo/LNMP/php/bin/php-config
$ make
$ make install 
 
#在php.ini 文件中配置 rdkafka扩展
$ vim /Users/shiyibo/LNMP/php/etc/php.ini
extension=rdkafka.so
 
#查看扩展是否生效
$php -m | grep kafka

5.守护模式启动zookeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

6.守护模式启动kafka

nohup bin/kafka-server-start.sh config/server.properties &

7.kafka客户端创建主题

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

8.kafka客户端创建生产者

bin/kafka-console-producer.sh --topic quickstart-events --broker-list localhost:9092

9.kafka客户端创建消费者

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

10.php创建生产者

<?php

/**
 * 消息生产者
 */

$objRdKafka = new RdKafka\Producer();
//$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("ttt");

// 从终端接收输入
$oInputHandler = fopen('php://stdin', 'r');

while (true) {
    echo "\nEnter  messages:\n";
    $sMsg = trim(fgets($oInputHandler));

    // 空消息意味着退出
    if (empty($sMsg)) {
        break;
    }

    // 发送消息
    $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

echo "done\n";

11.php创建消费者

<?php

/**
 * 消费者消费消息
 */
$objRdKafka = new RdKafka\Consumer();
//$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");

$oObjTopic = $objRdKafka->newTopic("ttt");

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $oMsg->errstr(), "\n";
        break;
    } else {
        file_put_contents("kafka.log",$oMsg->payload.PHP_EOL,FILE_APPEND);
        echo $oMsg->payload, "\n";
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 使用php连接操作kafka,从安装kafka到引入php扩展来操作kafka。 一、安装 注:需安装JDK 1....
    __sk阅读 4,368评论 0 5
  • Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高...
    daos阅读 6,461评论 1 3
  • 1. 概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特...
    EmmaQin阅读 5,943评论 0 1
  • 一、kafka使用背景 1、Kafka使用背景 在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的...
    开发者之路阅读 340评论 0 0
  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,579评论 16 22