【译】PHP Kafka客户端 - php-rdkafka

本文翻译自php-rdkafka的README。项目地址

PHP-rdkafka是一个轻量的librdkafka封装,提供了一个 PHP 5 / PHP 7 Kafka客户端。
它支持高级和低级消费者、生产者,以及元数据API。
它的API尽可能地和librdkafka的保持相似,并被完整地记录在这里

安装

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.setup.html

示例

https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples.html

用法

生产

创建生产者

为了生产,我们首先需要创建一个生产者,并且给它添加经纪人(Kafka服务器):

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("10.0.0.1:9092,10.0.0.2:9092");

生产消息

注意:确保你的生产者在合适的时候关闭,以避免信息的丢失。
然后,我们从生产者创建一个主题实例:

<?php

$topic = $rk->newTopic("test");

从上面开始,我们就可以使用生产方法生产任何我们想要的信息:

<?php

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

第一个参数是分区。RD_KAFKA_PARTITION_UA代表未赋值,让librdkafka去选择分区。
第二个参数是信息标记,应该是0或者RD_KAFKA_MSG_F_BLOCK,代表在整个队列上阻塞生产。
消息负载,可以是任何内容。

合理关闭

关闭应该在销毁生产者实例之前完成,以确保在终止之前完成所有排队中和运行中的生产请求。为$timeout_ms设置一个合理的值。
注意:不调用flush会导致消息丢失!

$rk->flush($timeout_ms);

如果你不在乎发送中的消息还没有发送完,你可以在调用flush之前调用purge:

// Forget messages that are not fully sent yet
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);

$rk->flush($timeout_ms);

高级消费

RdKafka\KafkaConsumer类支持自动分区分配/撤销。请看这里的例子

低级消费

我们首先需要创建一个低级消费者,并且给它添加经纪人(Kafka服务器):

<?php
$conf = new Conf();
$conf->set('log_level', LOG_LEVEL);
$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("10.0.0.1,10.0.0.2");

然后,创建一个主题实例,并且从分区0开始消费:

<?php

$topic = $rk->newTopic("test");

// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

接下来,取回消费到的消息:

<?php

while (true) {
    // The first argument is the partition (again).
    // The second argument is the timeout.
    $msg = $topic->consume(0, 1000);
    if (null === $msg) {
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

低级消费(多主题/分区)

可以通过告诉librdkafka将来自这些主题/分区的所有消息转发到一个内部队列,然后从这个队列消费来实现:
创建队列:

<?php
$queue = $rk->newQueue();

添加主题/分区到队列:

<?php

$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

接下来,取回消费到的消息:

<?php

while (true) {
    // The only argument is the timeout.
    $msg = $queue->consume(1000);
    if (null === $msg) {
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

使用存储的偏移量

librdkafka可以存储偏移量到本地文件或者经纪人。默认是本地文件,并且当你使用RD_KAFKA_OFFSET_STORED作为消费偏移量时,librdkafka就开始存储偏移量。
默认情况下,文件被创建在当前目录,基于主题和分区命名。通过设置offset.store.path可以改变这个目录。
其他有趣的属性是:auto.commit.interval.ms,auto.commit.enable,group.id,max.poll.interval.ms。

<?php

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);

$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

有趣的配置参数

queued.max.messages.kbytes

默认情况下,librdkafka将为每个被消费的分区缓冲最多1GB的消息。
可以通过减小这个参数的值来降低内存使用量。

topic.metadata.refresh.sparse和topic.metadata.refresh.interval.ms

每个消费者和生产者实例都会以topic.metadata.refresh.interval.ms这个参数定义的间隔来取得主题元数据。由于你的librdkafka版本差异,这个参数的默认值可能是10秒或600秒。
librdkafka默认获取集群中所有主题的元数据。将topic.metadata.refresh.sparse设置成true可以保证librdkafka只获取它使用的的主题。
将topic.metadata.refresh.sparse设置成true,topic.metadata.refresh.interval.ms设置成600秒(±)可以有效地降低带宽,取决于消费者和主题的数量。

internal.termination.signal

这个设置允许librdkafka线程在librdkafka完成后立即终止。这将允许你的PHP进程/请求快速终止。

<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);

socket.blocking.max.ms (librdkafka < 1.0.0)

Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage.

减少这个设置的值可以提高关机速度。这个值定义了librdkafka在一次读取循环迭代中阻塞的最大时间,也定义了librdkafka主线程检查终止的频率。

queue.buffering.max.ms

这定义了librdkafka在发送一批消息之前的最大等待时间和默认时间。将此设置减少到例如1ms,可以确保消息尽快发送。
这可以减少rdkafka实例和PHP进程/请求的关闭时间。

性能/低延迟设置

这是一个为低延迟而优化的配置。这允许PHP进程/请求尽快发送消息并快速终止。

<?php

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
} else {
    $conf->set('queue.buffering.max.ms', 1);
}

$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);

生产后的轮询也可以很重要地减少终止时间:

$producer->produce(...);
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容