rocketmq-php-client 下载地址:https://github.com/zhongwenyu/rocketMQ
thinkphp6框架中开发代码:
<?php
/**
* RockerMq生产者
*/
declare (strict_types=1);
namespace app\controller\mq;
use think\Exception;
class RocketMqProducer extends Base
{
public function __construct()
{
}
/**
* 发送消息
*/
public function send()
{
$InstanceName = 'WmsStock';
$NamesrvAddr = '192.168.100.209:9876';
$producer = new \RocketMQ\Producer($InstanceName);
$producer->setInstanceName($InstanceName); //设置实例名
$producer->setNamesrvAddr($NamesrvAddr); //设置名字服务链接地址
//$producer->setTcpTransportPullThreadNum(40); //设置传出链接线程数 默认是cpu core的数值
//$producer->setTcpTransportConnectTimeout(3000); //设置tcp链接超时时间,单位是毫秒
//$producer->setTcpTransportTryLockTimeout(3000); //设置申请锁超时时间,单位是毫秒
//$producer->setSendMsgTimeout(1000); //发送消息超时时间,单位是毫秒
$producer->start();
//生产消息
$message = new \RocketMQ\Message('TIANMASPORT', 'MQ_TAG_SUPPLIER', '发送内容 hello world!');
$sendResult = $producer->send($message);
print_r($sendResult);
printf("|%-30s|%-40s|\n", "msgId", $sendResult->getMsgId());
printf("|%-30s|%-40s|\n", "offsetMsgId", $sendResult->getOffsetMsgId());
printf("|%-30s|%-40s|\n", "sendStatus", $sendResult->getSendStatus());
printf("|%-30s|%-40s|\n", "queueOffset", $sendResult->getQueueOffset());
printf("|%-30s|%-40s|\n", "body", $message->getBody());
$producer->shutdown();
}
}