安装amqp
composer require hyperf/amqp
创建一个消费者
[ 消费消息 ]
php bin/hyperf.php gen:amqp-consumer DemoConsumer
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", name="DemoConsumer", nums=1)
*/
#[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "DemoConsumer", nums: 1)]
class DemoConsumer extends ConsumerMessage
{
public function consumeMessage($data, AMQPMessage $message): string
{
print_r($data);
return Result::ACK;
}
}
创建一个生产者
[ 投递消息 ]
php bin/hyperf.php gen:amqp-producer DemoProducer
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* @Producer(exchange="hyperf", routingKey="hyperf")
*/
#[Producer(exchange: 'hyperf', routingKey: 'hyperf')]
class DemoProducer extends ProducerMessage
{
public function __construct($data)
{
print_r($data);
$this->payload = $data;
}
}
创建生产者脚本
php bin/hyperf.php gen:command DelayCommand
<?php
declare(strict_types=1);
namespace App\Command;
use App\Amqp\Producer\DelayDirectProducer;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Context\ApplicationContext as ContextApplicationContext;
use Hyperf\Utils\ApplicationContext;
use Psr\Container\ContainerInterface;
/**
* @Command
*/
#[Command]
class DelayCommand extends HyperfCommand
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct('demo:queue');
}
public function configure()
{
parent::configure();
$this->setDescription('生产消息发布 ');
}
public function handle()
{
for ($i = 0; $i < 100; $i++) {
$data = [
"ss" => 122,
"num" => $i,
"number" => mt_rand(1111, 9999),
"time" => time(),
];
$message = new DemoProducer($data);
$producer = ContextApplicationContext::getContainer()->get(Producer::class);
$ret = $producer->produce($message);
}
print_r($ret);
$this->line('Hello Hyperf!', 'info');
exit();
}
}