Workerman 源码业务逻辑分析
目录
项目概述
Workerman 是一个高性能的 PHP Socket 服务器框架,采用事件驱动、非阻塞 I/O 模型。核心特点:
- 多进程架构:Master-Worker 进程模型
- 事件驱动:基于 EventLoop 实现异步非阻塞
- 多协议支持:TCP、UDP、HTTP、WebSocket 等
- 高并发:单进程可处理数万连接
核心架构
架构层次
┌─────────────────────────────────────┐
│ Worker (进程管理) │
│ - 进程fork/监控 │
│ - Socket监听 │
│ - 信号处理 │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ EventLoop (事件循环) │
│ - Select/Event/Swoole/Swow/Fiber │
│ - 事件注册/分发 │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ Connection (连接管理) │
│ - TcpConnection │
│ - UdpConnection │
│ - AsyncTcpConnection │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ Protocol (协议处理) │
│ - Http/WebSocket/Text/Frame │
└─────────────────────────────────────┘
EventLoop 事件循环详解
EventLoop 是 Workerman 的核心,负责监听和分发 I/O 事件。Workerman 支持多种 EventLoop 实现,通过 EventInterface 统一接口。
EventInterface 接口定义
// src/Events/EventInterface.php
interface EventInterface
{
// 延迟执行
public function delay(float $delay, callable $func, array $args = []): int;
// 重复执行
public function repeat(float $interval, callable $func, array $args = []): int;
// 监听可读事件
public function onReadable($stream, callable $func): void;
// 监听可写事件
public function onWritable($stream, callable $func): void;
// 监听信号
public function onSignal(int $signal, callable $func): void;
// 运行事件循环
public function run(): void;
// 停止事件循环
public function stop(): void;
}
EventLoop 实现选择
在 Worker::initGlobalEvent() 中自动选择:
protected static function initGlobalEvent(): void
{
if (static::$globalEvent !== null) {
static::$eventLoopClass = get_class(static::$globalEvent);
static::$globalEvent = null;
return;
}
if (!empty(static::$eventLoopClass)) {
if (!is_subclass_of(static::$eventLoopClass, EventInterface::class)) {
throw new RuntimeException(sprintf('%s::$eventLoopClass must implement %s', static::class, EventInterface::class));
}
return;
}
static::$eventLoopClass = match (true) {
extension_loaded('event') => Event::class,
default => Select::class,
};
}
优先级:
- 如果安装了
event扩展 → 使用Event(libevent) - 否则 → 使用
Select(PHP stream_select)
1. Select EventLoop(默认实现)
位置:src/Events/Select.php
实现原理:基于 PHP 的 stream_select() 系统调用
核心代码解析:
public function run(): void
{
while ($this->running) {
$read = $this->readFds;
$write = $this->writeFds;
$except = $this->exceptFds;
if ($read || $write || $except) {
// Waiting read/write/signal/timeout events.
try {
@stream_select($read, $write, $except, 0, $this->selectTimeout);
} catch (Throwable) {
// do nothing
}
} else {
$this->selectTimeout >= 1 && usleep($this->selectTimeout);
}
foreach ($read as $fd) {
$fdKey = (int)$fd;
if (isset($this->readEvents[$fdKey])) {
$this->readEvents[$fdKey]($fd);
}
}
foreach ($write as $fd) {
$fdKey = (int)$fd;
if (isset($this->writeEvents[$fdKey])) {
$this->writeEvents[$fdKey]($fd);
}
}
foreach ($except as $fd) {
$fdKey = (int)$fd;
if (isset($this->exceptEvents[$fdKey])) {
$this->exceptEvents[$fdKey]($fd);
}
}
if ($this->nextTickTime > 0) {
if (microtime(true) >= $this->nextTickTime) {
$this->tick();
} else {
$this->selectTimeout = (int)(($this->nextTickTime - microtime(true)) * 1000000);
}
}
// The $this->signalEvents are empty under Windows, make sure not to call pcntl_signal_dispatch.
if ($this->signalEvents) {
// Calls signal handlers for pending signals
pcntl_signal_dispatch();
}
}
}
特点:
- ✅ 纯 PHP 实现,无需扩展
- ✅ 跨平台(Windows/Linux)
- ❌ 连接数限制:Linux 1024,Windows 256
- ❌ 性能相对较低
定时器实现:使用 SplPriorityQueue 优先级队列管理定时任务
protected function tick(): void
{
$tasksToInsert = [];
while (!$this->scheduler->isEmpty()) {
$schedulerData = $this->scheduler->top();
$timerId = $schedulerData['data'];
$nextRunTime = -$schedulerData['priority'];
$timeNow = microtime(true);
$this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000);
if ($this->selectTimeout <= 0) {
$this->scheduler->extract();
if (!isset($this->eventTimer[$timerId])) {
continue;
}
// [func, args, timer_interval]
$taskData = $this->eventTimer[$timerId];
if (isset($taskData[2])) {
$nextRunTime = $timeNow + $taskData[2];
$tasksToInsert[] = [$timerId, -$nextRunTime];
} else {
unset($this->eventTimer[$timerId]);
}
$this->safeCall($taskData[0], $taskData[1]);
} else {
break;
}
}
foreach ($tasksToInsert as $item) {
$this->scheduler->insert($item[0], $item[1]);
}
if (!$this->scheduler->isEmpty()) {
$schedulerData = $this->scheduler->top();
$nextRunTime = -$schedulerData['priority'];
$this->setNextTickTime($nextRunTime);
return;
}
$this->setNextTickTime(0);
}
2. Event EventLoop(libevent)
位置:src/Events/Event.php
实现原理:基于 libevent 扩展(高性能 C 库)
核心代码解析:
public function run(): void
{
$this->eventBase->loop();
}
特点:
- ✅ 高性能(C 实现)
- ✅ 支持大量连接(无 1024 限制)
- ❌ 需要安装 event 扩展
- ❌ Windows 支持有限
事件注册示例:
public function onReadable($stream, callable $func): void
{
$className = $this->eventClassName;
$fdKey = (int)$stream;
$event = new $className($this->eventBase, $stream, $className::READ | $className::PERSIST, $func);
if ($event->add()) {
$this->readEvents[$fdKey] = $event;
}
}
3. Swoole EventLoop
位置:src/Events/Swoole.php
实现原理:基于 Swoole 扩展的事件循环
核心代码解析:
public function run(): void
{
// Avoid process exit due to no listening
Timer::tick(100000000, static fn() => null);
Event::wait();
}
特点:
- ✅ 协程支持(自动 Hook)
- ✅ 高性能
- ❌ 需要安装 Swoole 扩展
协程 Hook:
public function __construct()
{
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
}
4. Swow EventLoop
位置:src/Events/Swow.php
实现原理:基于 Swow 扩展的协程事件循环
核心代码解析:
public function onReadable($stream, callable $func): void
{
$fd = (int)$stream;
if (isset($this->readEvents[$fd])) {
$this->offReadable($stream);
}
Coroutine::run(function () use ($stream, $func, $fd): void {
try {
$this->readEvents[$fd] = Coroutine::getCurrent();
while (true) {
if (!is_resource($stream)) {
$this->offReadable($stream);
break;
}
// Under Windows, setting a timeout is necessary; otherwise, the accept cannot be listened to.
// Setting it to 1000ms will result in a 1-second delay for the first accept under Windows.
if (!isset($this->readEvents[$fd]) || $this->readEvents[$fd] !== Coroutine::getCurrent()) {
break;
}
$rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP, 1000);
if ($rEvent !== STREAM_POLLNONE) {
$this->safeCall($func, [$stream]);
}
if ($rEvent !== STREAM_POLLIN && $rEvent !== STREAM_POLLNONE) {
$this->offReadable($stream);
break;
}
}
} catch (RuntimeException) {
$this->offReadable($stream);
}
});
}
特点:
- ✅ 原生协程支持
- ✅ 高性能
- ❌ 需要安装 Swow 扩展
5. Fiber EventLoop(Revolt)
位置:src/Events/Fiber.php
实现原理:基于 Revolt EventLoop(PHP 8.1+ Fiber)
核心代码解析:
public function run(): void
{
$this->driver->run();
}
特点:
- ✅ 纯 PHP 实现(PHP 8.1+)
- ✅ 协程支持
- ❌ 需要 PHP 8.1+
6. Ev EventLoop
位置:src/Events/Ev.php
实现原理:基于 libev 扩展
特点:
- ✅ 高性能
- ❌ 需要安装 ev 扩展
Worker 进程管理
Worker 类核心职责
- 进程管理:Master-Worker 模型
- Socket 监听:创建和监听服务器 Socket
- 信号处理:处理进程信号(reload、stop 等)
- 事件循环初始化:创建全局 EventLoop
启动流程
public static function runAll(): void
{
try {
static::checkSapiEnv();
static::initStdOut();
static::init();
static::parseCommand();
static::checkPortAvailable();
static::lock();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::lock(LOCK_UN);
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
} catch (Throwable $e) {
static::log($e);
}
}
流程说明:
-
checkSapiEnv()- 检查运行环境(必须是 CLI) -
init()- 初始化全局变量、EventLoop -
parseCommand()- 解析命令行参数(start/stop/reload) -
daemonize()- 守护进程化(可选) -
initWorkers()- 初始化所有 Worker 实例 -
forkWorkers()- Fork 子进程 -
monitorWorkers()- 监控子进程(主进程循环)
进程 Fork
Linux 实现:
protected static function forkOneWorkerForLinux(self $worker): void
{
// Get available worker id.
$id = static::getId($worker->workerId, 0);
$pid = pcntl_fork();
// For master process.
if ($pid > 0) {
static::$pidMap[$worker->workerId][$pid] = $pid;
static::$idMap[$worker->workerId][$id] = $pid;
} // For child processes.
elseif (0 === $pid) {
srand();
mt_srand();
static::$gracefulStop = false;
if (static::$status === static::STATUS_STARTING) {
static::resetStd();
}
static::$pidsToRestart = static::$pidMap = [];
// Remove other listener.
foreach (static::$workers as $key => $oneWorker) {
if ($oneWorker->workerId !== $worker->workerId) {
$oneWorker->unlisten();
unset(static::$workers[$key]);
}
}
Timer::delAll();
//Update process state.
static::$status = static::STATUS_RUNNING;
// Register shutdown function for checking errors.
register_shutdown_function(static::checkErrors(...));
// Create a global event loop.
if (static::$globalEvent === null) {
static::$eventLoopClass = $worker->eventLoop ?: static::$eventLoopClass;
static::$globalEvent = new static::$eventLoopClass();
static::$globalEvent->setErrorHandler(function ($exception) {
static::stopAll(250, $exception);
});
}
// Reinstall signal.
static::reinstallSignal();
// Init Timer.
Timer::init(static::$globalEvent);
restore_error_handler();
static::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
// Main loop.
static::$globalEvent->run();
if (static::$status !== self::STATUS_SHUTDOWN) {
$err = new Exception('event-loop exited');
static::log($err);
exit(250);
}
exit(0);
} else {
throw new RuntimeException("forkOneWorker fail");
}
}
关键点:
- 子进程创建独立的 EventLoop
- 子进程只保留自己的 Worker 实例
- 子进程进入
$globalEvent->run()事件循环
Socket 监听
public function listen(bool $autoAccept = true): void
{
if (!$this->socketName) {
return;
}
if (!$this->mainSocket) {
$localSocket = $this->parseSocketAddress();
// Flag.
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errNo = 0;
$errMsg = '';
// SO_REUSEPORT.
if ($this->reusePort && DIRECTORY_SEPARATOR !== '\\') {
stream_context_set_option($this->socketContext, 'socket', 'so_reuseport', 1);
}
// Create an Internet or Unix domain server socket.
$this->mainSocket = stream_socket_server($localSocket, $errNo, $errMsg, $flags, $this->socketContext);
if (!$this->mainSocket) {
throw new RuntimeException($errMsg);
}
if ($this->transport === 'ssl') {
stream_socket_enable_crypto($this->mainSocket, false);
} elseif ($this->transport === 'unix') {
$socketFile = substr($localSocket, 7);
if ($this->user) {
chown($socketFile, $this->user);
}
if ($this->group) {
chgrp($socketFile, $this->group);
}
}
// Try to open keepalive for tcp and disable Nagle algorithm.
if (function_exists('socket_import_stream') && self::BUILD_IN_TRANSPORTS[$this->transport] === 'tcp') {
set_error_handler(static fn (): bool => true);
$socket = socket_import_stream($this->mainSocket);
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
if (defined('TCP_KEEPIDLE') && defined('TCP_KEEPINTVL') && defined('TCP_KEEPCNT')) {
socket_set_option($socket, SOL_TCP, TCP_KEEPIDLE, TcpConnection::TCP_KEEPALIVE_INTERVAL);
socket_set_option($socket, SOL_TCP, TCP_KEEPINTVL, TcpConnection::TCP_KEEPALIVE_INTERVAL);
socket_set_option($socket, SOL_TCP, TCP_KEEPCNT, 1);
}
restore_error_handler();
}
// Non blocking.
stream_set_blocking($this->mainSocket, false);
}
if ($autoAccept) {
$this->resumeAccept();
}
}
接受连接:
public function resumeAccept(): void
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent !== null && ($this->pauseAccept === null || $this->pauseAccept === true) && $this->mainSocket !== null) {
if ($this->transport !== 'udp') {
static::$globalEvent->onReadable($this->mainSocket, $this->acceptTcpConnection(...));
} else {
static::$globalEvent->onReadable($this->mainSocket, $this->acceptUdpConnection(...));
}
$this->pauseAccept = false;
}
}
连接管理
TcpConnection 核心功能
位置:src/Connection/TcpConnection.php
职责:
- 管理单个 TCP 连接的生命周期
- 处理数据收发(读写缓冲区)
- 协议解析(通过 Protocol)
- 事件回调(onMessage、onClose 等)
连接创建
protected function acceptTcpConnection(mixed $socket): void
{
// Accept a connection on server socket.
set_error_handler(static fn (): bool => true);
$newSocket = stream_socket_accept($socket, 0, $remoteAddress);
restore_error_handler();
// Thundering herd.
if (!$newSocket) {
return;
}
// TcpConnection.
$connection = new TcpConnection(static::$globalEvent, $newSocket, $remoteAddress);
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->protocol;
$connection->transport = $this->transport;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferDrain = $this->onBufferDrain;
$connection->onBufferFull = $this->onBufferFull;
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
($this->onConnect)($connection);
} catch (Throwable $e) {
static::stopAll(250, $e);
}
}
}
数据读取
public function baseRead($socket, bool $checkEof = true): void
{
static $requests = [];
// SSL handshake.
if ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true) {
if ($this->doSslHandshake($socket)) {
$this->sslHandshakeCompleted = true;
if ($this->sendBuffer) {
$this->eventLoop->onWritable($socket, $this->baseWrite(...));
}
} else {
return;
}
}
$buffer = '';
try {
$buffer = @fread($socket, self::READ_BUFFER_SIZE);
} catch (Throwable) {
// do nothing
}
// Check connection closed.
if ($buffer === '' || $buffer === false) {
if ($checkEof && (!is_resource($socket) || feof($socket) || $buffer === false)) {
$this->destroy();
return;
}
} else {
$this->bytesRead += strlen($buffer);
if ($this->status === self::STATUS_ENDING) {
return;
}
if ($this->recvBuffer === '') {
if (!isset($buffer[static::MAX_CACHE_STRING_LENGTH]) && isset($requests[$buffer])) {
++self::$statistics['total_request'];
if ($this->protocol === Http::class) {
$request = $requests[$buffer];
$request->connection = $this;
try {
($this->onMessage)($this, $request);
} catch (Throwable $e) {
$this->error($e);
}
$request = clone $request;
$request->destroy();
$requests[$buffer] = $request;
return;
}
$request = $requests[$buffer];
try {
($this->onMessage)($this, $request);
} catch (Throwable $e) {
$this->error($e);
}
return;
}
$this->recvBuffer = $buffer;
} else {
$this->recvBuffer .= $buffer;
}
}
// If the application layer protocol has been set up.
if ($this->protocol !== null) {
while ($this->recvBuffer !== '' && !$this->isPaused) {
// The current packet length is known.
if ($this->currentPackageLength) {
// Data is not enough for a package.
if ($this->currentPackageLength > strlen($this->recvBuffer)) {
break;
}
} else {
// Get current package length.
try {
$this->currentPackageLength = $this->protocol::input($this->recvBuffer, $this);
} catch (Throwable $e) {
$this->currentPackageLength = -1;
Worker::safeEcho((string)$e);
}
// The packet length is unknown.
if ($this->currentPackageLength === 0) {
break;
} elseif ($this->currentPackageLength > 0 && $this->currentPackageLength <= $this->maxPackageSize) {
// Data is not enough for a package.
if ($this->currentPackageLength > strlen($this->recvBuffer)) {
break;
}
} // Wrong package.
else {
Worker::safeEcho((string)(new RuntimeException("Protocol $this->protocol Error package. package_length=" . var_export($this->currentPackageLength, true))));
$this->destroy();
return;
}
}
// The data is enough for a packet.
++self::$statistics['total_request'];
// The current packet length is equal to the length of the buffer.
if ($one = (strlen($this->recvBuffer) === $this->currentPackageLength)) {
$oneRequestBuffer = $this->recvBuffer;
$this->recvBuffer = '';
} else {
// Get a full package from the buffer.
$oneRequestBuffer = substr($this->recvBuffer, 0, $this->currentPackageLength);
// Remove the current package from receive buffer.
$this->recvBuffer = substr($this->recvBuffer, $this->currentPackageLength);
}
// Reset the current packet length to 0.
$this->currentPackageLength = 0;
try {
// Decode request buffer before Emitting onMessage callback.
$request = $this->protocol::decode($oneRequestBuffer, $this);
if ((!is_object($request) || $request instanceof Request) && $one && !isset($oneRequestBuffer[static::MAX_CACHE_STRING_LENGTH])) {
($this->onMessage)($this, $request);
if ($request instanceof Request) {
$request = clone $request;
$request->destroy();
}
$requests[$oneRequestBuffer] = $request;
if (count($requests) > static::MAX_CACHE_SIZE) {
unset($requests[key($requests)]);
}
return;
}
($this->onMessage)($this, $request);
} catch (Throwable $e) {
$this->error($e);
}
}
return;
}
if ($this->recvBuffer === '' || $this->isPaused) {
return;
}
// Application protocol is not set.
++self::$statistics['total_request'];
try {
($this->onMessage)($this, $this->recvBuffer);
} catch (Throwable $e) {
$this->error($e);
}
// Clean receive buffer.
$this->recvBuffer = '';
}
读取流程:
- 检查 SSL 握手
- 从 Socket 读取数据到缓冲区
- 如果有协议,调用
protocol::input()判断包长度 - 数据足够时,调用
protocol::decode()解码 - 触发
onMessage回调
数据发送
public function send(mixed $sendBuffer, bool $raw = false): bool|null
{
if ($this->status === self::STATUS_ENDING || $this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
return false;
}
// Try to call protocol::encode($sendBuffer) before sending.
if (false === $raw && $this->protocol !== null) {
try {
$sendBuffer = $this->protocol::encode($sendBuffer, $this);
} catch(Throwable $e) {
$this->error($e);
}
if ($sendBuffer === '') {
return null;
}
}
if ($this->status !== self::STATUS_ESTABLISHED ||
($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true)
) {
if ($this->sendBuffer && $this->bufferIsFull()) {
++self::$statistics['send_fail'];
return false;
}
$this->sendBuffer .= $sendBuffer;
$this->checkBufferWillFull();
return null;
}
// Attempt to send data directly.
if ($this->sendBuffer === '') {
if ($this->transport === 'ssl') {
$this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
$this->sendBuffer = $sendBuffer;
$this->checkBufferWillFull();
return null;
}
$len = 0;
try {
$len = @fwrite($this->socket, $sendBuffer);
} catch (Throwable $e) {
Worker::log($e);
}
// send successful.
if ($len === strlen($sendBuffer)) {
$this->bytesWritten += $len;
return true;
}
// Send only part of the data.
if ($len > 0) {
$this->sendBuffer = substr($sendBuffer, $len);
$this->bytesWritten += $len;
} else {
// Connection closed?
if (!is_resource($this->socket) || feof($this->socket)) {
++self::$statistics['send_fail'];
if ($this->onError) {
try {
($this->onError)($this, static::SEND_FAIL, 'client closed');
} catch (Throwable $e) {
$this->error($e);
}
}
$this->destroy();
return false;
}
$this->sendBuffer = $sendBuffer;
}
$this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
// Check if send buffer will be full.
$this->checkBufferWillFull();
return null;
}
if ($this->bufferIsFull()) {
++self::$statistics['send_fail'];
return false;
}
$this->sendBuffer .= $sendBuffer;
// Check if send buffer is full.
$this->checkBufferWillFull();
return null;
}
发送流程:
- 协议编码(如果有)
- 尝试直接发送
- 如果部分发送,剩余数据加入发送缓冲区
- 注册可写事件,等待 Socket 可写时继续发送
协议处理
ProtocolInterface 接口
interface ProtocolInterface
{
/**
* Check the integrity of the package.
* Please return the length of package.
* If length is unknown please return 0 that means waiting for more data.
* If the package has something wrong please return -1 the connection will be closed.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return int
*/
public static function input(string $buffer, ConnectionInterface $connection): int;
/**
* Decode package and emit onMessage($message) callback, $message is the result that decode returned.
*
* @param string $buffer
* @param ConnectionInterface $connection
* @return mixed
*/
public static function decode(string $buffer, ConnectionInterface $connection): mixed;
/**
* Encode package before sending to client.
*
* @param mixed $data
* @param ConnectionInterface $connection
* @return string
*/
public static function encode(mixed $data, ConnectionInterface $connection): string;
}
协议工作流程:
-
input()- 检查缓冲区,返回完整包长度(0=数据不足,>0=包长度,-1=错误) -
decode()- 解码完整包,返回业务数据 -
encode()- 编码业务数据为协议格式
定时器系统
Timer 类
位置:src/Timer.php
功能:
- 延迟执行:
Timer::delay() - 重复执行:
Timer::repeat() - 删除定时器:
Timer::del()
实现原理:
public static function repeat(float $timeInterval, callable $func, array $args = []): int
{
return self::$event->repeat($timeInterval, $func, $args);
}
/**
* Delay.
*
* @param float $timeInterval
* @param callable $func
* @param array $args
* @return int
*/
public static function delay(float $timeInterval, callable $func, array $args = []): int
{
return self::$event->delay($timeInterval, $func, $args);
}
Timer 是对 EventLoop 定时器功能的封装,实际由 EventLoop 实现。
启动流程
完整启动序列图
用户调用 Worker::runAll()
│
├─> checkSapiEnv() # 检查 CLI 环境
├─> init() # 初始化全局变量、EventLoop
├─> parseCommand() # 解析命令(start/stop/reload)
├─> daemonize() # 守护进程化(可选)
├─> initWorkers() # 初始化 Worker 实例
├─> installSignal() # 安装信号处理器
├─> forkWorkers() # Fork 子进程
│ │
│ └─> 子进程执行:
│ ├─> 创建 EventLoop
│ ├─> worker->run()
│ └─> eventLoop->run() # 进入事件循环
│
└─> monitorWorkers() # 主进程监控子进程
信号处理
支持的信号
protected static function installSignal(): void
{
if (DIRECTORY_SEPARATOR !== '/') {
return;
}
$signals = [SIGINT, SIGTERM, SIGHUP, SIGTSTP, SIGQUIT, SIGUSR1, SIGUSR2, SIGIOT, SIGIO];
foreach ($signals as $signal) {
pcntl_signal($signal, static::signalHandler(...), false);
}
// ignore
pcntl_signal(SIGPIPE, SIG_IGN, false);
}
信号处理逻辑
protected static function signalHandler(int $signal): void
{
switch ($signal) {
// Stop.
case SIGINT:
case SIGTERM:
case SIGHUP:
case SIGTSTP:
static::$gracefulStop = false;
static::stopAll(0, 'received signal ' . static::getSignalName($signal));
break;
// Graceful stop.
case SIGQUIT:
static::$gracefulStop = true;
static::stopAll(0, 'received signal ' . static::getSignalName($signal));
break;
// Reload.
case SIGUSR2:
case SIGUSR1:
if (static::$status === static::STATUS_RELOADING || static::$status === static::STATUS_SHUTDOWN) {
return;
}
static::$gracefulStop = $signal === SIGUSR2;
static::$pidsToRestart = static::getAllWorkerPids();
static::reload();
break;
// Show status.
case SIGIOT:
static::writeStatisticsToStatusFile();
break;
// Show connection status.
case SIGIO:
static::writeConnectionsStatisticsToStatusFile();
break;
}
}
信号说明:
-
SIGINT/SIGTERM- 立即停止 -
SIGQUIT- 优雅停止(等待连接关闭) -
SIGUSR1- 普通重载 -
SIGUSR2- 优雅重载 -
SIGIOT- 显示状态 -
SIGIO- 显示连接状态
总结
核心设计模式
- 事件驱动模式:基于 EventLoop 实现异步非阻塞
- 多进程模型:Master-Worker 架构,提高并发能力
- 策略模式:多种 EventLoop 实现可插拔
- 观察者模式:通过回调函数处理事件
性能优化点
- 非阻塞 I/O:所有 Socket 设置为非阻塞
- 事件复用:EventLoop 统一管理所有 I/O 事件
- 缓冲区管理:读写缓冲区减少系统调用
- 协议缓存:HTTP 请求对象缓存(减少对象创建)
关键文件索引
-
src/Worker.php- 核心进程管理 -
src/Events/*.php- EventLoop 实现 -
src/Connection/TcpConnection.php- TCP 连接管理 -
src/Timer.php- 定时器系统 -
src/Protocols/*.php- 协议实现