php 使用sse实现消息实时推送2025-01-04

image.png
  • sse_server.php
<?php

/**
 * SSE(Server-Sent Events)函数,用于向客户端推送实时数据
 *
 * @param callable $dataCallback 数据回调函数,用于生成需要发送的数据
 * @param int $interval 数据发送间隔(毫秒)
 * @param int $heartbeatInterval 心跳检测间隔(秒)
 * @param int $maxExecutionTime 脚本最大执行时间(秒),0 表示无限制
 * @param bool $enableHeartbeat 是否启用心跳检测
 */
function sse($dataCallback, $interval = 2, $heartbeatInterval = 50, $maxExecutionTime = 0, $enableHeartbeat = true)
{

    ini_set('output_buffering', 'off'); // 禁用输出缓冲,确保数据实时发送到客户端
    ini_set('zlib.output_compression', 0); // 禁用压缩
    ini_set('implicit_flush', 1); // 启用隐式刷新,确保每次输出后立即发送到客户端

    // 设置SSE相关的HTTP头
    header('Content-Type: text/event-stream'); // 声明内容类型为事件流
    header('Cache-Control: no-cache'); // 禁用缓存,确保数据实时更新
    header('Connection: keep-alive'); // 保持连接
    header('X-Accel-Buffering: no'); // 禁用 Nginx 缓冲

    // 关闭脚本时间限制
    if ($maxExecutionTime > 0) {
        set_time_limit($maxExecutionTime); // 设置最大执行时间
    } else {
        set_time_limit(0); // 无时间限制
    }

    // 禁用输出缓冲
    // 关闭所有输出缓冲区并丢弃内容
    while (ob_get_level() > 0) {
        ob_end_clean();
    }

    // 初始化心跳检测计数器
    $lastHeartbeat = time();

    // 主循环
    while (true) {
        try {
            // 检查客户端是否断开连接
            if (connection_aborted()) {
                break;
            }

            // 调用回调函数获取数据
            $data = $dataCallback();

            // 发送数据
            if (!empty($data)) {
                $event = isset($data['event']) ? $data['event'] : 'message'; // 默认事件类型为 'message'
                $payload = isset($data['data']) ? $data['data'] : $data; // 数据部分
                echo "event: $event\n";
                echo "data: " . json_encode($payload) . "\n\n";
            }

            // 发送心跳检测(注释行)
            if ($enableHeartbeat && time() - $lastHeartbeat >= $heartbeatInterval) {
                echo ": heartbeat\n\n"; // 心跳检测
                $lastHeartbeat = time();
            }

            // 刷新输出缓冲区
            ob_flush();
            flush();

            // 等待指定间隔
            usleep($interval * 100000); // 使用微秒级睡眠,更精确
        } catch (\Exception $e) {
            // 发送错误信息
            echo "event: error\ndata: " . json_encode(['message' => $e->getMessage()]) . "\n\n";
            ob_flush();
            flush();
            break; // 退出循环
        }
    }

    // 关闭连接时清理
    echo "event: close\ndata: Connection closed by server\n\n";
    ob_flush();
    flush();
    usleep(100000); // 延迟 100 毫秒,确保消息发送完毕
    exit();
}

/**
 * 示例回调函数,用于生成数据
 *
 * @return array
 */
function generateData()
{
    static $counter = 0;
    $counter++;
    return [
        'event' => 'update', // 自定义事件类型
        'data' => [
            'time' => date('Y-m-d H:i:s'),
            'counter' => $counter,
            'message' => 'This is a message from the server.'
        ]
    ];
}

// 使用 SSE 函数
try {
    sse('generateData', 1, 55); // 每1秒发送一次数据,每55秒发送一次心跳,最大运行时间300秒
} catch (\Exception $e) {
    error_log('SSE Error: ' . $e->getMessage());
}

  • sse_client.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Client</title>
    <style>
        #messages {
            height: 300px;
            overflow-y: auto;
            border: 1px solid #ccc;
            padding: 10px;
            margin-top: 10px;
        }
    </style>
</head>
<body>
    <h1>SSE Client</h1>
    <div id="messages"></div>

    <script>
        // 创建 EventSource 对象,连接到 SSE 服务端
        const eventSource = new EventSource('sse_server.php');

        // 添加消息到页面
        function addMessage(message) {
            const p = document.createElement('p');
            p.textContent = message;
            const messagesDiv = document.getElementById('messages');
            messagesDiv.appendChild(p);

            // 自动滚动到底部
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }

        // 监听 message 事件
        eventSource.addEventListener('message', function(event) {
            try {
                const data = JSON.parse(event.data); // 解析服务器发送的数据
                if (data && data.time && data.counter !== undefined && data.message) {
                    const message = `[${data.time}] Counter: ${data.counter}, Message: ${data.message}`;
                    addMessage(message);
                }
            } catch (error) {
                console.error('Failed to parse event data:', error);
            }
        });

        // 监听 heartbeat 事件
        eventSource.addEventListener('heartbeat', function(event) {
            console.log('Heartbeat received:', event.data);
            addMessage('[HEARTBEAT] Server is alive.');
        });

        // 监听服务器关闭事件
        eventSource.addEventListener('close', function(event) {
            console.log('Server closed the connection.');
            addMessage('[CLOSE] Connection closed by server.');
            eventSource.close(); // 关闭连接
        });

        // 监听错误事件
        eventSource.onerror = function(event) {
            if (event.eventPhase === EventSource.CLOSED) {
                console.log('SSE connection closed by server.');
            } else {
                console.error('SSE connection error:', event);
            }
            eventSource.close(); // 关闭连接
        };
    </script>
</body>
</html>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容