往kafka发送一条消息,大致需要如下步骤:
- 生成一个producer实例
$conf = new Rdkafka\Conf();
$conf->setErrorCb(function($producer, $msg) {
printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
exit;
});
$conf->setDrMsgCb(function($producer, $msg) {
if($msg->err) {
throw new Exception('Message delivery failed:' . $msg->errstr());
} else {
echo 'sent message sucessfully.';
}
});
$producer = new Rdkafka\Producer($conf);
- 给producer绑定broker以及topic
$producer->addBrokers('192.168.11.134:9092,192.168.11.133:9092');
$topic = $producer->newTopic('test-topic');
- 发送消息
$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'message payload');
- 通过flush阻塞当前进程,等待所有的producer请求任务完成
$producer->flush(-1);
注:在producer服务中,如果不调用flush,有丢消息的可能。另外如果设置了回调函数,一定要调用poll或者flush,否则回调函数有可能不被调用。
也可用如下方法:
<?php
$conf = new Rdkafka\Conf();
$conf->set('metadata.broker.list','192.168.11.134:9092,192.168.11.133:9092');
$producer = new Rdkafka\Producer($conf);
$topic = $producer->newTopic('test-topic');
for($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for($fRetries = 0; $fRetries < 10; $fRetries++) {
$res = $producer->flush(10000);
if($res === RD_KAFKA_RESP_ERR_NO_ERROR) {
echo 'write message successfully.';
break;
}
}
if(RD_KAFKA_RESP_ERR_NO_ERROR != $res) {
throw new \RuntimeException('unable to flush, message might lost.');
}