1、安装及启动:
2、业务逻辑:
- 每接受50条消息,插入一次数据库
- 若半小时内还未接受到50条消息,将已有的数据先插入
3、代码实现:
- 安装依赖:
composer require php-amqplib/php-amqplib
- 发送消息:
private function getConn()
{
return new AMQPStreamConnection(
C('RABBITMQ.host'),
C('RABBITMQ.port'),
C('RABBITMQ.login'),
C('RABBITMQ.password'),
C('RABBITMQ.vhost')
);
}
/**
* 此方法向rabbitmq服务发送模拟数据
*/
public function send()
{
$connection = $this->getConn();
/** @var AMQPChannel $channel */
$channel = $connection->channel();
$channel->exchange_declare(self::EXCHANGE, self::EXCHANGE_TYPE, false, true, false);
$body = '{"mailno":"3102478466' . rand(100, 999) . '","callTime":"2019-06-03 20:00:51","ringTime":"10","returnAnswerTime":' . rand(0, 50) . ',"businessType":"delivery"}';
$msg = new AMQPMessage($body);
$channel->basic_publish($msg, self::EXCHANGE, self::ROUTING_KEY);
$this->shutdown($channel, $connection);
}
- 接受数据 :
循环50次消息消费
利用超时异常解决半小时无消息进来的业务
/**
* @throws ErrorException
*/
public function receive()
{
$connection = $this->getConn();
/** @var AMQPChannel $channel */
$channel = $connection->channel();
$channel->exchange_declare(self::EXCHANGE, self::EXCHANGE_TYPE, false, true, false);
$channel->queue_declare(self::QUEUE, false, true, false, false);
$channel->queue_bind(self::QUEUE, self::EXCHANGE, self::ROUTING_KEY);
$iSql = '';
$iSqlPre = "INSERT ……";
$expirePeriod = 30 * 60;
// 每次处理50条数据
for ($i = 0; $i < self::PROCESS_NUM_PT; $i++) {
$channel->basic_consume(
self::QUEUE,
self::TAG . uniqid() . (string)$i,
false,
false,
false,
false,
function ($msg) use ($iSqlPre, $i, &$iSql) {
$this->process_message($msg, $iSqlPre, $i + 1, $iSql);
}
);
}
try {
// 30分钟内未接到新的消息则提前插入已存在的数据
while ($channel->is_consuming()) {
$channel->wait(null, false, $expirePeriod);
}
} catch (AMQPTimeoutException $e) {
Log::write(
sprintf(
"%s: %s, start inserting data due to no message in 30 minutes.",
self::TAG,
$e->getMessage()
)
);
if (!empty($iSql)) {
$iSql = sprintf($iSqlPre, rtrim($iSql, ','));
……
}
$this->shutdown($channel, $connection);
$this->receive();
}
$this->shutdown($channel, $connection);
}
/**
* @param AMQPMessage $message
* @param string $iSqlPre
* @param int $num
* @param string $iSql
*/
private function process_message($message, $iSqlPre, $num, &$iSql)
{
if (empty($message->body)) {
return;
}
/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
// 1表示未更新,2表示已更新及已acknowledge
$state = 1;
try {
$data = json_decode($message->body, true);
if ($data['businessType'] != 'delivery') {
return;
}
// 更新
……
// 插入
$iSql .=
……
$channel->basic_ack($message->delivery_info['delivery_tag']);
$state = 2;
if ($num >= self::PROCESS_NUM_PT) {
$iSql = sprintf($iSqlPre, rtrim($iSql, ','));
……
$iSql = '';
}
} catch (Exception $e) {
if ($state == 1) {
$channel->basic_nack($message->delivery_info['delivery_tag'], true, true);
}
if ($num >= self::PROCESS_NUM_PT && (!empty($iSql))) {
$this->redis->lPush(sprintf('%s:%s', self::TAG, 'Exp'), $iSql);
$iSql = '';
}
Log::write(
sprintf(
"%s: %s, body of the message: %s",
self::TAG,
$e->getMessage(),
$message->body
)
);
}
}
- 循环函数嵌套(递归)次数限制
报错如下:
PHP Fatal error: Maximum function nesting level of '500' reached, aborting! in /mnt/d/…… on line 185
PHP Stack trace:……
解决方法:
namespace ……下加入下行代码(top观察了一会,没啥异常,暂不知道是否有隐患):
ini_set('xdebug.max_nesting_level', -1);
或者不使用receive自身嵌套,使用handle代替,每分钟执行一次handle,crontab -e 中加入:
* * * * * nohup php /……/index.php Module/Controller/handle > /dev/null &
/**
* @throws ErrorException
*/
public function handle()
{
if (!$this->redis->exists(self::EXPIRE_KEY)) {
$this->redis->set(self::EXPIRE_KEY,1);
$this->receive();
}
}
/**
* @throws ErrorException
*/
public function receive()
{
……
} catch (AMQPTimeoutException $e) {
$this->redis->del(self::EXPIRE_KEY);
Log::write(
sprintf(
"%s: %s, start inserting data due to no message in 30 minutes.",
self::TAG,
$e->getMessage()
)
);
if (!empty($iSql)) {
$iSql = sprintf($iSqlPre, rtrim($iSql, ','));
……
}
$this->shutdown($channel, $connection);
}
$this->shutdown($channel, $connection);
}