去年 Mix PHP V1 发布时,我写了一个多进程的邮件发送实例: 使用 mixphp 打造多进程异步邮件发送,今年 Mix PHP V2 发布,全面的协程支持让我们可以使用一个进程就可达到之前多个进程都无法达到的更高 IO 性能,所以今天重写一个协程池版本的邮件发送实例。
邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。
下面演示一个异步邮件发送系统的开发过程,涉及知识点:
- 异步
- 消息队列
- 守护进程
- 协程池
如何使用消息队列实现异步
PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:
- redis
- rabbitmq
- kafka
本次我们选用 Redis 来实现异步邮件发送,Redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:
// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);
架构设计
本实例由传统 MVC 框架投递邮件发送需求(生产者),Mix PHP 编写的守护程序执行发送任务(消费者)。
邮件发送库选型
以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。
由于发送任务是由 Mix PHP 执行,所以 swiftmailer 是安装在 Mix PHP 项目中,在项目根目录中执行以下命令安装:
composer require swiftmailer/swiftmailer
生产者开发
在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 Mix PHP 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。
在传统 MVC 框架的控制器中增加如下代码:
通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
throw new \Exception('Redis connect failed.');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
'to' => '***@qq.com',
'body' => 'The message content',
'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));
通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有在生产者中执行,而是待消息被消费者获取后才执行。
消费者开发
使用本例时,请确保你使用的 Swoole 编译时开启了 openssl
本例我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的邮件发送程序。
因为我们是开发一个守护程序,所以我们在 applications/daemon
模块中开发,首先我们在配置 applications/daemon/config/main.php
中注册一个命令:
// 命令
'commands' => [
'mailer' => ['Mailer', 'description' => 'Mailer daemon.'],
],
注册的命令中指定的 Mailer 命令类,接下来我们编写一个 MailerCommand 类:
applications/daemon/src/Commands/MailerCommand.php
<?php
namespace Daemon\Commands;
use Daemon\Libraries\MailerWorker;
use Mix\Concurrent\CoroutinePool\Dispatcher;
use Mix\Core\Coroutine\Channel;
use Mix\Helper\ProcessHelper;
/**
* Class MailerCommand
* @package Daemon\Commands
* @author liu,jian <coder.keda@gmail.com>
*/
class MailerCommand
{
/**
* 退出
* @var bool
*/
public $quit = false;
/**
* 主函数
*/
public function main()
{
// 捕获信号
ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) {
$this->quit = true;
ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null);
});
// 协程池执行任务
xgo(function () {
$maxWorkers = 20;
$maxQueue = 20;
$jobQueue = new Channel($maxQueue);
$dispatch = new Dispatcher([
'jobQueue' => $jobQueue,
'maxWorkers' => $maxWorkers,
]);
$dispatch->start(MailerWorker::class);
// 投放任务
$redis = app()->redisPool->getConnection();
while (true) {
if ($this->quit) {
$dispatch->stop();
return;
}
try {
$data = $redis->brPop(['queue:email'], 3);
} catch (\Throwable $e) {
$dispatch->stop();
return;
}
if (!$data) {
continue;
}
$data = array_pop($data); // brPop命令最后一个键才是值
$jobQueue->push($data);
}
});
}
}
从
$data = $redis->brPop(['queue:email'], 3);
外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用supervisor
、pm2
等工具重启守护进程。
上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的邮件发送代码逻辑就在 MailerWorker 类中:
applications/daemon/src/Libraries/MailerWorker.php
<?php
namespace Daemon\Libraries;
use Mix\Concurrent\CoroutinePool\AbstractWorker;
use Mix\Concurrent\CoroutinePool\WorkerInterface;
/**
* Class MailerWorker
* @package Daemon\Libraries
* @author liu,jian <coder.keda@gmail.com>
*/
class MailerWorker extends AbstractWorker implements WorkerInterface
{
/**
* 邮件发送器
* @var Mailer
*/
public $mailer;
/**
* 初始化事件
*/
public function onInitialize()
{
parent::onInitialize(); // TODO: Change the autogenerated stub
// 实例化一些需重用的对象
$this->mailer = new Mailer();
}
/**
* 处理
* @param $data
*/
public function handle($data)
{
// TODO: Implement handle() method.
$data = unserialize($data);
if (empty($data)) {
return;
}
try {
$this->mailer->send($data['to'], $data['subject'], $data['body']);
app()->log->info("Mail sent successfully:to {to} subject {subject}", $data);
} catch (\Throwable $e) {
app()->log->error("Mail failed to send:to {to} subject {subject} error {error}", array_merge($data, ['error' => $e->getMessage()]));
}
}
}
由以上代码可见,Worker 在初始化时,新增了一个 Mailer 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Mailer 发送程序:
applications/daemon/src/Libraries/Mailer.php
<?php
namespace Daemon\Libraries;
use Mix\Core\Coroutine;
/**
* Class Mailer
* @package Daemon\Libraries
* @author liu,jian <coder.keda@gmail.com>
*/
class Mailer
{
/**
* 配置信息
*/
const HOST = 'smtpdm.aliyun.com';
const PORT = 465;
const SECURITY = 'ssl';
const USERNAME = '***';
const PASSWORD = '***';
/**
* Mailer constructor.
*/
public function __construct()
{
// 开启协程钩子
Coroutine::enableHook();
}
/**
* 发送
* @param $to
* @param $subject
* @param $body
* @return int
*/
public function send($to, $subject, $body)
{
// Create the Transport
$transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
->setUsername(self::USERNAME)
->setPassword(self::PASSWORD);
// Create the Mailer using your created Transport
$mailer = new \Swift_Mailer($transport);
// Create a message
$message = (new \Swift_Message($subject))
->setFrom([self::USERNAME => '**网'])
->setTo($to)
->setBody($body);
// Send the message
return $mailer->send($message);
}
}
在 Mailer 发送程序中我们使用了前面 composer 安装的 swiftmailer 库来发送邮件,以上就完成了全部的代码逻辑,现在我们开始测试。
先启动消费者守护程序:
[root@localhost bin]# ./mix-daemon mailer
将上文的生产者脚本命名为 push.php
然后在 CLI 中执行 (开一个新终端):
[root@localhost bin]# php /tmp/push.php
消费者守护程序结果:
[root@localhost bin]# ./mix-daemon mailer
[info] 2019-04-15 11:48:36 [message] Mail sent successfully:to ***@qq.com subject The title content
命令行终端打印了发送成功的日志,发送完成。