rocketmq客户端-thinkphp6

rocketmq-php-client 下载地址:https://github.com/zhongwenyu/rocketMQ

thinkphp6框架中开发代码:

<?php

/**

* RockerMq生产者

*/

declare (strict_types=1);

namespace app\controller\mq;

use think\Exception;

class RocketMqProducer extends Base

{

    public function __construct()

{

}

    /**

    * 发送消息

    */

    public function send()

{

        $InstanceName = 'WmsStock';

        $NamesrvAddr  = '192.168.100.209:9876';

        $producer = new \RocketMQ\Producer($InstanceName);

        $producer->setInstanceName($InstanceName); //设置实例名

        $producer->setNamesrvAddr($NamesrvAddr); //设置名字服务链接地址

        //$producer->setTcpTransportPullThreadNum(40); //设置传出链接线程数 默认是cpu core的数值

        //$producer->setTcpTransportConnectTimeout(3000); //设置tcp链接超时时间,单位是毫秒

        //$producer->setTcpTransportTryLockTimeout(3000); //设置申请锁超时时间,单位是毫秒

        //$producer->setSendMsgTimeout(1000); //发送消息超时时间,单位是毫秒

        $producer->start();

        //生产消息

        $message    = new \RocketMQ\Message('TIANMASPORT', 'MQ_TAG_SUPPLIER', '发送内容 hello world!');

        $sendResult = $producer->send($message);

        print_r($sendResult);

        printf("|%-30s|%-40s|\n", "msgId", $sendResult->getMsgId());

        printf("|%-30s|%-40s|\n", "offsetMsgId", $sendResult->getOffsetMsgId());

        printf("|%-30s|%-40s|\n", "sendStatus", $sendResult->getSendStatus());

        printf("|%-30s|%-40s|\n", "queueOffset", $sendResult->getQueueOffset());

        printf("|%-30s|%-40s|\n", "body", $message->getBody());

        $producer->shutdown();

    }

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容