Workerman 源码分析

Workerman 源码业务逻辑分析

目录

  1. 项目概述
  2. 核心架构
  3. EventLoop 事件循环详解
  4. Worker 进程管理
  5. 连接管理
  6. 协议处理
  7. 定时器系统
  8. 启动流程
  9. 信号处理

项目概述

Workerman 是一个高性能的 PHP Socket 服务器框架,采用事件驱动、非阻塞 I/O 模型。核心特点:

  • 多进程架构:Master-Worker 进程模型
  • 事件驱动:基于 EventLoop 实现异步非阻塞
  • 多协议支持:TCP、UDP、HTTP、WebSocket 等
  • 高并发:单进程可处理数万连接

核心架构

架构层次

┌─────────────────────────────────────┐
│         Worker (进程管理)            │
│  - 进程fork/监控                     │
│  - Socket监听                        │
│  - 信号处理                          │
└──────────────┬──────────────────────┘
               │
┌──────────────▼──────────────────────┐
│      EventLoop (事件循环)            │
│  - Select/Event/Swoole/Swow/Fiber  │
│  - 事件注册/分发                     │
└──────────────┬──────────────────────┘
               │
┌──────────────▼──────────────────────┐
│    Connection (连接管理)             │
│  - TcpConnection                     │
│  - UdpConnection                     │
│  - AsyncTcpConnection               │
└──────────────┬──────────────────────┘
               │
┌──────────────▼──────────────────────┐
│    Protocol (协议处理)               │
│  - Http/WebSocket/Text/Frame        │
└─────────────────────────────────────┘

EventLoop 事件循环详解

EventLoop 是 Workerman 的核心,负责监听和分发 I/O 事件。Workerman 支持多种 EventLoop 实现,通过 EventInterface 统一接口。

EventInterface 接口定义

// src/Events/EventInterface.php
interface EventInterface
{
    // 延迟执行
    public function delay(float $delay, callable $func, array $args = []): int;
    
    // 重复执行
    public function repeat(float $interval, callable $func, array $args = []): int;
    
    // 监听可读事件
    public function onReadable($stream, callable $func): void;
    
    // 监听可写事件
    public function onWritable($stream, callable $func): void;
    
    // 监听信号
    public function onSignal(int $signal, callable $func): void;
    
    // 运行事件循环
    public function run(): void;
    
    // 停止事件循环
    public function stop(): void;
}

EventLoop 实现选择

Worker::initGlobalEvent() 中自动选择:

    protected static function initGlobalEvent(): void
    {
        if (static::$globalEvent !== null) {
            static::$eventLoopClass = get_class(static::$globalEvent);
            static::$globalEvent = null;
            return;
        }

        if (!empty(static::$eventLoopClass)) {
            if (!is_subclass_of(static::$eventLoopClass, EventInterface::class)) {
                throw new RuntimeException(sprintf('%s::$eventLoopClass must implement %s', static::class, EventInterface::class));
            }
            return;
        }

        static::$eventLoopClass = match (true) {
            extension_loaded('event') => Event::class,
            default => Select::class,
        };
    }

优先级

  1. 如果安装了 event 扩展 → 使用 Event (libevent)
  2. 否则 → 使用 Select (PHP stream_select)

1. Select EventLoop(默认实现)

位置src/Events/Select.php

实现原理:基于 PHP 的 stream_select() 系统调用

核心代码解析

    public function run(): void
    {
        while ($this->running) {
            $read = $this->readFds;
            $write = $this->writeFds;
            $except = $this->exceptFds;
            if ($read || $write || $except) {
                // Waiting read/write/signal/timeout events.
                try {
                    @stream_select($read, $write, $except, 0, $this->selectTimeout);
                } catch (Throwable) {
                    // do nothing
                }
            } else {
                $this->selectTimeout >= 1 && usleep($this->selectTimeout);
            }

            foreach ($read as $fd) {
                $fdKey = (int)$fd;
                if (isset($this->readEvents[$fdKey])) {
                    $this->readEvents[$fdKey]($fd);
                }
            }

            foreach ($write as $fd) {
                $fdKey = (int)$fd;
                if (isset($this->writeEvents[$fdKey])) {
                    $this->writeEvents[$fdKey]($fd);
                }
            }

            foreach ($except as $fd) {
                $fdKey = (int)$fd;
                if (isset($this->exceptEvents[$fdKey])) {
                    $this->exceptEvents[$fdKey]($fd);
                }
            }

            if ($this->nextTickTime > 0) {
                if (microtime(true) >= $this->nextTickTime) {
                    $this->tick();
                } else {
                    $this->selectTimeout = (int)(($this->nextTickTime - microtime(true)) * 1000000);
                }
            }

            // The $this->signalEvents are empty under Windows, make sure not to call pcntl_signal_dispatch.
            if ($this->signalEvents) {
                // Calls signal handlers for pending signals
                pcntl_signal_dispatch();
            }
        }
    }

特点

  • ✅ 纯 PHP 实现,无需扩展
  • ✅ 跨平台(Windows/Linux)
  • ❌ 连接数限制:Linux 1024,Windows 256
  • ❌ 性能相对较低

定时器实现:使用 SplPriorityQueue 优先级队列管理定时任务

    protected function tick(): void
    {
        $tasksToInsert = [];
        while (!$this->scheduler->isEmpty()) {
            $schedulerData = $this->scheduler->top();
            $timerId = $schedulerData['data'];
            $nextRunTime = -$schedulerData['priority'];
            $timeNow = microtime(true);
            $this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000);

            if ($this->selectTimeout <= 0) {
                $this->scheduler->extract();

                if (!isset($this->eventTimer[$timerId])) {
                    continue;
                }

                // [func, args, timer_interval]
                $taskData = $this->eventTimer[$timerId];
                if (isset($taskData[2])) {
                    $nextRunTime = $timeNow + $taskData[2];
                    $tasksToInsert[] = [$timerId, -$nextRunTime];
                } else {
                    unset($this->eventTimer[$timerId]);
                }
                $this->safeCall($taskData[0], $taskData[1]);
            } else {
                break;
            }
        }
        foreach ($tasksToInsert as $item) {
            $this->scheduler->insert($item[0], $item[1]);
        }
        if (!$this->scheduler->isEmpty()) {
            $schedulerData = $this->scheduler->top();
            $nextRunTime = -$schedulerData['priority'];
            $this->setNextTickTime($nextRunTime);
            return;
        }
        $this->setNextTickTime(0);
    }

2. Event EventLoop(libevent)

位置src/Events/Event.php

实现原理:基于 libevent 扩展(高性能 C 库)

核心代码解析

    public function run(): void
    {
        $this->eventBase->loop();
    }

特点

  • ✅ 高性能(C 实现)
  • ✅ 支持大量连接(无 1024 限制)
  • ❌ 需要安装 event 扩展
  • ❌ Windows 支持有限

事件注册示例

    public function onReadable($stream, callable $func): void
    {
        $className = $this->eventClassName;
        $fdKey = (int)$stream;
        $event = new $className($this->eventBase, $stream, $className::READ | $className::PERSIST, $func);
        if ($event->add()) {
            $this->readEvents[$fdKey] = $event;
        }
    }

3. Swoole EventLoop

位置src/Events/Swoole.php

实现原理:基于 Swoole 扩展的事件循环

核心代码解析

    public function run(): void
    {
        // Avoid process exit due to no listening
        Timer::tick(100000000, static fn() => null);
        Event::wait();
    }

特点

  • ✅ 协程支持(自动 Hook)
  • ✅ 高性能
  • ❌ 需要安装 Swoole 扩展

协程 Hook

    public function __construct()
    {
        Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
    }

4. Swow EventLoop

位置src/Events/Swow.php

实现原理:基于 Swow 扩展的协程事件循环

核心代码解析

    public function onReadable($stream, callable $func): void
    {
        $fd = (int)$stream;
        if (isset($this->readEvents[$fd])) {
            $this->offReadable($stream);
        }
        Coroutine::run(function () use ($stream, $func, $fd): void {
            try {
                $this->readEvents[$fd] = Coroutine::getCurrent();
                while (true) {
                    if (!is_resource($stream)) {
                        $this->offReadable($stream);
                        break;
                    }
                    // Under Windows, setting a timeout is necessary; otherwise, the accept cannot be listened to.
                    // Setting it to 1000ms will result in a 1-second delay for the first accept under Windows.
                    if (!isset($this->readEvents[$fd]) || $this->readEvents[$fd] !== Coroutine::getCurrent()) {
                        break;
                    }
                    $rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP, 1000);
                    if ($rEvent !== STREAM_POLLNONE) {
                        $this->safeCall($func, [$stream]);
                    }
                    if ($rEvent !== STREAM_POLLIN && $rEvent !== STREAM_POLLNONE) {
                        $this->offReadable($stream);
                        break;
                    }
                }
            } catch (RuntimeException) {
                $this->offReadable($stream);
            }
        });
    }

特点

  • ✅ 原生协程支持
  • ✅ 高性能
  • ❌ 需要安装 Swow 扩展

5. Fiber EventLoop(Revolt)

位置src/Events/Fiber.php

实现原理:基于 Revolt EventLoop(PHP 8.1+ Fiber)

核心代码解析

    public function run(): void
    {
        $this->driver->run();
    }

特点

  • ✅ 纯 PHP 实现(PHP 8.1+)
  • ✅ 协程支持
  • ❌ 需要 PHP 8.1+

6. Ev EventLoop

位置src/Events/Ev.php

实现原理:基于 libev 扩展

特点

  • ✅ 高性能
  • ❌ 需要安装 ev 扩展

Worker 进程管理

Worker 类核心职责

  1. 进程管理:Master-Worker 模型
  2. Socket 监听:创建和监听服务器 Socket
  3. 信号处理:处理进程信号(reload、stop 等)
  4. 事件循环初始化:创建全局 EventLoop

启动流程

    public static function runAll(): void
    {
        try {
            static::checkSapiEnv();
            static::initStdOut();
            static::init();
            static::parseCommand();
            static::checkPortAvailable();
            static::lock();
            static::daemonize();
            static::initWorkers();
            static::installSignal();
            static::saveMasterPid();
            static::lock(LOCK_UN);
            static::displayUI();
            static::forkWorkers();
            static::resetStd();
            static::monitorWorkers();
        } catch (Throwable $e) {
            static::log($e);
        }
    }

流程说明

  1. checkSapiEnv() - 检查运行环境(必须是 CLI)
  2. init() - 初始化全局变量、EventLoop
  3. parseCommand() - 解析命令行参数(start/stop/reload)
  4. daemonize() - 守护进程化(可选)
  5. initWorkers() - 初始化所有 Worker 实例
  6. forkWorkers() - Fork 子进程
  7. monitorWorkers() - 监控子进程(主进程循环)

进程 Fork

Linux 实现

    protected static function forkOneWorkerForLinux(self $worker): void
    {
        // Get available worker id.
        $id = static::getId($worker->workerId, 0);
        $pid = pcntl_fork();
        // For master process.
        if ($pid > 0) {
            static::$pidMap[$worker->workerId][$pid] = $pid;
            static::$idMap[$worker->workerId][$id] = $pid;
        } // For child processes.
        elseif (0 === $pid) {
            srand();
            mt_srand();
            static::$gracefulStop = false;
            if (static::$status === static::STATUS_STARTING) {
                static::resetStd();
            }
            static::$pidsToRestart = static::$pidMap = [];
            // Remove other listener.
            foreach (static::$workers as $key => $oneWorker) {
                if ($oneWorker->workerId !== $worker->workerId) {
                    $oneWorker->unlisten();
                    unset(static::$workers[$key]);
                }
            }
            Timer::delAll();

            //Update process state.
            static::$status = static::STATUS_RUNNING;

            // Register shutdown function for checking errors.
            register_shutdown_function(static::checkErrors(...));

            // Create a global event loop.
            if (static::$globalEvent === null) {
                static::$eventLoopClass = $worker->eventLoop ?: static::$eventLoopClass;
                static::$globalEvent = new static::$eventLoopClass();
                static::$globalEvent->setErrorHandler(function ($exception) {
                    static::stopAll(250, $exception);
                });
            }

            // Reinstall signal.
            static::reinstallSignal();

            // Init Timer.
            Timer::init(static::$globalEvent);

            restore_error_handler();

            static::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
            $worker->setUserAndGroup();
            $worker->id = $id;
            $worker->run();
            // Main loop.
            static::$globalEvent->run();
            if (static::$status !== self::STATUS_SHUTDOWN) {
                $err = new Exception('event-loop exited');
                static::log($err);
                exit(250);
            }
            exit(0);
        } else {
            throw new RuntimeException("forkOneWorker fail");
        }
    }

关键点

  • 子进程创建独立的 EventLoop
  • 子进程只保留自己的 Worker 实例
  • 子进程进入 $globalEvent->run() 事件循环

Socket 监听

    public function listen(bool $autoAccept = true): void
    {
        if (!$this->socketName) {
            return;
        }

        if (!$this->mainSocket) {

            $localSocket = $this->parseSocketAddress();

            // Flag.
            $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
            $errNo = 0;
            $errMsg = '';
            // SO_REUSEPORT.
            if ($this->reusePort && DIRECTORY_SEPARATOR !== '\\') {
                stream_context_set_option($this->socketContext, 'socket', 'so_reuseport', 1);
            }

            // Create an Internet or Unix domain server socket.
            $this->mainSocket = stream_socket_server($localSocket, $errNo, $errMsg, $flags, $this->socketContext);
            if (!$this->mainSocket) {
                throw new RuntimeException($errMsg);
            }

            if ($this->transport === 'ssl') {
                stream_socket_enable_crypto($this->mainSocket, false);
            } elseif ($this->transport === 'unix') {
                $socketFile = substr($localSocket, 7);
                if ($this->user) {
                    chown($socketFile, $this->user);
                }
                if ($this->group) {
                    chgrp($socketFile, $this->group);
                }
            }

            // Try to open keepalive for tcp and disable Nagle algorithm.
            if (function_exists('socket_import_stream') && self::BUILD_IN_TRANSPORTS[$this->transport] === 'tcp') {
                set_error_handler(static fn (): bool => true);
                $socket = socket_import_stream($this->mainSocket);
                socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
                socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
                if (defined('TCP_KEEPIDLE') && defined('TCP_KEEPINTVL') && defined('TCP_KEEPCNT')) {
                    socket_set_option($socket, SOL_TCP, TCP_KEEPIDLE, TcpConnection::TCP_KEEPALIVE_INTERVAL);
                    socket_set_option($socket, SOL_TCP, TCP_KEEPINTVL, TcpConnection::TCP_KEEPALIVE_INTERVAL);
                    socket_set_option($socket, SOL_TCP, TCP_KEEPCNT, 1);
                }
                restore_error_handler();
            }

            // Non blocking.
            stream_set_blocking($this->mainSocket, false);
        }

        if ($autoAccept) {
            $this->resumeAccept();
        }
    }

接受连接

    public function resumeAccept(): void
    {
        // Register a listener to be notified when server socket is ready to read.
        if (static::$globalEvent !== null && ($this->pauseAccept === null || $this->pauseAccept === true) && $this->mainSocket !== null) {
            if ($this->transport !== 'udp') {
                static::$globalEvent->onReadable($this->mainSocket, $this->acceptTcpConnection(...));
            } else {
                static::$globalEvent->onReadable($this->mainSocket, $this->acceptUdpConnection(...));
            }
            $this->pauseAccept = false;
        }
    }

连接管理

TcpConnection 核心功能

位置src/Connection/TcpConnection.php

职责

  1. 管理单个 TCP 连接的生命周期
  2. 处理数据收发(读写缓冲区)
  3. 协议解析(通过 Protocol)
  4. 事件回调(onMessage、onClose 等)

连接创建

    protected function acceptTcpConnection(mixed $socket): void
    {
        // Accept a connection on server socket.
        set_error_handler(static fn (): bool => true);
        $newSocket = stream_socket_accept($socket, 0, $remoteAddress);
        restore_error_handler();

        // Thundering herd.
        if (!$newSocket) {
            return;
        }

        // TcpConnection.
        $connection = new TcpConnection(static::$globalEvent, $newSocket, $remoteAddress);
        $this->connections[$connection->id] = $connection;
        $connection->worker = $this;
        $connection->protocol = $this->protocol;
        $connection->transport = $this->transport;
        $connection->onMessage = $this->onMessage;
        $connection->onClose = $this->onClose;
        $connection->onError = $this->onError;
        $connection->onBufferDrain = $this->onBufferDrain;
        $connection->onBufferFull = $this->onBufferFull;

        // Try to emit onConnect callback.
        if ($this->onConnect) {
            try {
                ($this->onConnect)($connection);
            } catch (Throwable $e) {
                static::stopAll(250, $e);
            }
        }
    }

数据读取

    public function baseRead($socket, bool $checkEof = true): void
    {
        static $requests = [];
        // SSL handshake.
        if ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true) {
            if ($this->doSslHandshake($socket)) {
                $this->sslHandshakeCompleted = true;
                if ($this->sendBuffer) {
                    $this->eventLoop->onWritable($socket, $this->baseWrite(...));
                }
            } else {
                return;
            }
        }

        $buffer = '';
        try {
            $buffer = @fread($socket, self::READ_BUFFER_SIZE);
        } catch (Throwable) {
            // do nothing
        }

        // Check connection closed.
        if ($buffer === '' || $buffer === false) {
            if ($checkEof && (!is_resource($socket) || feof($socket) || $buffer === false)) {
                $this->destroy();
                return;
            }
        } else {
            $this->bytesRead += strlen($buffer);
            if ($this->status === self::STATUS_ENDING) {
                return;
            }
            if ($this->recvBuffer === '') {
                if (!isset($buffer[static::MAX_CACHE_STRING_LENGTH]) && isset($requests[$buffer])) {
                    ++self::$statistics['total_request'];
                    if ($this->protocol === Http::class) {
                        $request = $requests[$buffer];
                        $request->connection = $this;
                        try {
                            ($this->onMessage)($this, $request);
                        } catch (Throwable $e) {
                            $this->error($e);
                        }
                        $request = clone $request;
                        $request->destroy();
                        $requests[$buffer] = $request;
                        return;
                    }
                    $request = $requests[$buffer];
                    try {
                        ($this->onMessage)($this, $request);
                    } catch (Throwable $e) {
                        $this->error($e);
                    }
                    return;
                }
                $this->recvBuffer = $buffer;
            } else {
                $this->recvBuffer .= $buffer;
            }
        }

        // If the application layer protocol has been set up.
        if ($this->protocol !== null) {
            while ($this->recvBuffer !== '' && !$this->isPaused) {
                // The current packet length is known.
                if ($this->currentPackageLength) {
                    // Data is not enough for a package.
                    if ($this->currentPackageLength > strlen($this->recvBuffer)) {
                        break;
                    }
                } else {
                    // Get current package length.
                    try {
                        $this->currentPackageLength = $this->protocol::input($this->recvBuffer, $this);
                    } catch (Throwable $e) {
                        $this->currentPackageLength = -1;
                        Worker::safeEcho((string)$e);
                    }
                    // The packet length is unknown.
                    if ($this->currentPackageLength === 0) {
                        break;
                    } elseif ($this->currentPackageLength > 0 && $this->currentPackageLength <= $this->maxPackageSize) {
                        // Data is not enough for a package.
                        if ($this->currentPackageLength > strlen($this->recvBuffer)) {
                            break;
                        }
                    } // Wrong package.
                    else {
                        Worker::safeEcho((string)(new RuntimeException("Protocol $this->protocol Error package. package_length=" . var_export($this->currentPackageLength, true))));
                        $this->destroy();
                        return;
                    }
                }

                // The data is enough for a packet.
                ++self::$statistics['total_request'];
                // The current packet length is equal to the length of the buffer.
                if ($one = (strlen($this->recvBuffer) === $this->currentPackageLength)) {
                    $oneRequestBuffer = $this->recvBuffer;
                    $this->recvBuffer = '';
                } else {
                    // Get a full package from the buffer.
                    $oneRequestBuffer = substr($this->recvBuffer, 0, $this->currentPackageLength);
                    // Remove the current package from receive buffer.
                    $this->recvBuffer = substr($this->recvBuffer, $this->currentPackageLength);
                }
                // Reset the current packet length to 0.
                $this->currentPackageLength = 0;
                try {
                    // Decode request buffer before Emitting onMessage callback.
                    $request = $this->protocol::decode($oneRequestBuffer, $this);
                    if ((!is_object($request) || $request instanceof Request) && $one && !isset($oneRequestBuffer[static::MAX_CACHE_STRING_LENGTH])) {
                        ($this->onMessage)($this, $request);
                        if ($request instanceof Request) {
                            $request = clone $request;
                            $request->destroy();
                        }
                        $requests[$oneRequestBuffer] = $request;
                        if (count($requests) > static::MAX_CACHE_SIZE) {
                            unset($requests[key($requests)]);
                        }
                        return;
                    }
                    ($this->onMessage)($this, $request);
                } catch (Throwable $e) {
                    $this->error($e);
                }
            }
            return;
        }

        if ($this->recvBuffer === '' || $this->isPaused) {
            return;
        }

        // Application protocol is not set.
        ++self::$statistics['total_request'];
        try {
            ($this->onMessage)($this, $this->recvBuffer);
        } catch (Throwable $e) {
            $this->error($e);
        }
        // Clean receive buffer.
        $this->recvBuffer = '';
    }

读取流程

  1. 检查 SSL 握手
  2. 从 Socket 读取数据到缓冲区
  3. 如果有协议,调用 protocol::input() 判断包长度
  4. 数据足够时,调用 protocol::decode() 解码
  5. 触发 onMessage 回调

数据发送

    public function send(mixed $sendBuffer, bool $raw = false): bool|null
    {
        if ($this->status === self::STATUS_ENDING || $this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
            return false;
        }

        // Try to call protocol::encode($sendBuffer) before sending.
        if (false === $raw && $this->protocol !== null) {
            try {
                $sendBuffer = $this->protocol::encode($sendBuffer, $this);
            } catch(Throwable $e) {
                $this->error($e);
            }
            if ($sendBuffer === '') {
                return null;
            }
        }

        if ($this->status !== self::STATUS_ESTABLISHED ||
            ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true)
        ) {
            if ($this->sendBuffer && $this->bufferIsFull()) {
                ++self::$statistics['send_fail'];
                return false;
            }
            $this->sendBuffer .= $sendBuffer;
            $this->checkBufferWillFull();
            return null;
        }

        // Attempt to send data directly.
        if ($this->sendBuffer === '') {
            if ($this->transport === 'ssl') {
                $this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
                $this->sendBuffer = $sendBuffer;
                $this->checkBufferWillFull();
                return null;
            }
            $len = 0;
            try {
                $len = @fwrite($this->socket, $sendBuffer);
            } catch (Throwable $e) {
                Worker::log($e);
            }
            // send successful.
            if ($len === strlen($sendBuffer)) {
                $this->bytesWritten += $len;
                return true;
            }
            // Send only part of the data.
            if ($len > 0) {
                $this->sendBuffer = substr($sendBuffer, $len);
                $this->bytesWritten += $len;
            } else {
                // Connection closed?
                if (!is_resource($this->socket) || feof($this->socket)) {
                    ++self::$statistics['send_fail'];
                    if ($this->onError) {
                        try {
                            ($this->onError)($this, static::SEND_FAIL, 'client closed');
                        } catch (Throwable $e) {
                            $this->error($e);
                        }
                    }
                    $this->destroy();
                    return false;
                }
                $this->sendBuffer = $sendBuffer;
            }
            $this->eventLoop->onWritable($this->socket, $this->baseWrite(...));
            // Check if send buffer will be full.
            $this->checkBufferWillFull();
            return null;
        }

        if ($this->bufferIsFull()) {
            ++self::$statistics['send_fail'];
            return false;
        }

        $this->sendBuffer .= $sendBuffer;
        // Check if send buffer is full.
        $this->checkBufferWillFull();
        return null;
    }

发送流程

  1. 协议编码(如果有)
  2. 尝试直接发送
  3. 如果部分发送,剩余数据加入发送缓冲区
  4. 注册可写事件,等待 Socket 可写时继续发送

协议处理

ProtocolInterface 接口

interface ProtocolInterface
{
    /**
     * Check the integrity of the package.
     * Please return the length of package.
     * If length is unknown please return 0 that means waiting for more data.
     * If the package has something wrong please return -1 the connection will be closed.
     *
     * @param string $buffer
     * @param ConnectionInterface $connection
     * @return int
     */
    public static function input(string $buffer, ConnectionInterface $connection): int;

    /**
     * Decode package and emit onMessage($message) callback, $message is the result that decode returned.
     *
     * @param string $buffer
     * @param ConnectionInterface $connection
     * @return mixed
     */
    public static function decode(string $buffer, ConnectionInterface $connection): mixed;

    /**
     * Encode package before sending to client.
     *
     * @param mixed $data
     * @param ConnectionInterface $connection
     * @return string
     */
    public static function encode(mixed $data, ConnectionInterface $connection): string;
}

协议工作流程

  1. input() - 检查缓冲区,返回完整包长度(0=数据不足,>0=包长度,-1=错误)
  2. decode() - 解码完整包,返回业务数据
  3. encode() - 编码业务数据为协议格式

定时器系统

Timer 类

位置src/Timer.php

功能

  • 延迟执行:Timer::delay()
  • 重复执行:Timer::repeat()
  • 删除定时器:Timer::del()

实现原理

    public static function repeat(float $timeInterval, callable $func, array $args = []): int
    {
        return self::$event->repeat($timeInterval, $func, $args);
    }

    /**
     * Delay.
     *
     * @param float $timeInterval
     * @param callable $func
     * @param array $args
     * @return int
     */
    public static function delay(float $timeInterval, callable $func, array $args = []): int
    {
        return self::$event->delay($timeInterval, $func, $args);
    }

Timer 是对 EventLoop 定时器功能的封装,实际由 EventLoop 实现。


启动流程

完整启动序列图

用户调用 Worker::runAll()
    │
    ├─> checkSapiEnv()          # 检查 CLI 环境
    ├─> init()                  # 初始化全局变量、EventLoop
    ├─> parseCommand()          # 解析命令(start/stop/reload)
    ├─> daemonize()             # 守护进程化(可选)
    ├─> initWorkers()           # 初始化 Worker 实例
    ├─> installSignal()         # 安装信号处理器
    ├─> forkWorkers()           # Fork 子进程
    │   │
    │   └─> 子进程执行:
    │       ├─> 创建 EventLoop
    │       ├─> worker->run()
    │       └─> eventLoop->run()  # 进入事件循环
    │
    └─> monitorWorkers()        # 主进程监控子进程

信号处理

支持的信号

    protected static function installSignal(): void
    {
        if (DIRECTORY_SEPARATOR !== '/') {
            return;
        }
        $signals = [SIGINT, SIGTERM, SIGHUP, SIGTSTP, SIGQUIT, SIGUSR1, SIGUSR2, SIGIOT, SIGIO];
        foreach ($signals as $signal) {
            pcntl_signal($signal, static::signalHandler(...), false);
        }
        // ignore
        pcntl_signal(SIGPIPE, SIG_IGN, false);
    }

信号处理逻辑

    protected static function signalHandler(int $signal): void
    {
        switch ($signal) {
            // Stop.
            case SIGINT:
            case SIGTERM:
            case SIGHUP:
            case SIGTSTP:
                static::$gracefulStop = false;
                static::stopAll(0, 'received signal ' . static::getSignalName($signal));
                break;
            // Graceful stop.
            case SIGQUIT:
                static::$gracefulStop = true;
                static::stopAll(0, 'received signal ' . static::getSignalName($signal));
                break;
            // Reload.
            case SIGUSR2:
            case SIGUSR1:
                if (static::$status === static::STATUS_RELOADING || static::$status === static::STATUS_SHUTDOWN) {
                    return;
                }
                static::$gracefulStop = $signal === SIGUSR2;
                static::$pidsToRestart = static::getAllWorkerPids();
                static::reload();
                break;
            // Show status.
            case SIGIOT:
                static::writeStatisticsToStatusFile();
                break;
            // Show connection status.
            case SIGIO:
                static::writeConnectionsStatisticsToStatusFile();
                break;
        }
    }

信号说明

  • SIGINT/SIGTERM - 立即停止
  • SIGQUIT - 优雅停止(等待连接关闭)
  • SIGUSR1 - 普通重载
  • SIGUSR2 - 优雅重载
  • SIGIOT - 显示状态
  • SIGIO - 显示连接状态

总结

核心设计模式

  1. 事件驱动模式:基于 EventLoop 实现异步非阻塞
  2. 多进程模型:Master-Worker 架构,提高并发能力
  3. 策略模式:多种 EventLoop 实现可插拔
  4. 观察者模式:通过回调函数处理事件

性能优化点

  1. 非阻塞 I/O:所有 Socket 设置为非阻塞
  2. 事件复用:EventLoop 统一管理所有 I/O 事件
  3. 缓冲区管理:读写缓冲区减少系统调用
  4. 协议缓存:HTTP 请求对象缓存(减少对象创建)

关键文件索引

  • src/Worker.php - 核心进程管理
  • src/Events/*.php - EventLoop 实现
  • src/Connection/TcpConnection.php - TCP 连接管理
  • src/Timer.php - 定时器系统
  • src/Protocols/*.php - 协议实现

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容