image.png
- sse_server.php
<?php
/**
* SSE(Server-Sent Events)函数,用于向客户端推送实时数据
*
* @param callable $dataCallback 数据回调函数,用于生成需要发送的数据
* @param int $interval 数据发送间隔(毫秒)
* @param int $heartbeatInterval 心跳检测间隔(秒)
* @param int $maxExecutionTime 脚本最大执行时间(秒),0 表示无限制
* @param bool $enableHeartbeat 是否启用心跳检测
*/
function sse($dataCallback, $interval = 2, $heartbeatInterval = 50, $maxExecutionTime = 0, $enableHeartbeat = true)
{
ini_set('output_buffering', 'off'); // 禁用输出缓冲,确保数据实时发送到客户端
ini_set('zlib.output_compression', 0); // 禁用压缩
ini_set('implicit_flush', 1); // 启用隐式刷新,确保每次输出后立即发送到客户端
// 设置SSE相关的HTTP头
header('Content-Type: text/event-stream'); // 声明内容类型为事件流
header('Cache-Control: no-cache'); // 禁用缓存,确保数据实时更新
header('Connection: keep-alive'); // 保持连接
header('X-Accel-Buffering: no'); // 禁用 Nginx 缓冲
// 关闭脚本时间限制
if ($maxExecutionTime > 0) {
set_time_limit($maxExecutionTime); // 设置最大执行时间
} else {
set_time_limit(0); // 无时间限制
}
// 禁用输出缓冲
// 关闭所有输出缓冲区并丢弃内容
while (ob_get_level() > 0) {
ob_end_clean();
}
// 初始化心跳检测计数器
$lastHeartbeat = time();
// 主循环
while (true) {
try {
// 检查客户端是否断开连接
if (connection_aborted()) {
break;
}
// 调用回调函数获取数据
$data = $dataCallback();
// 发送数据
if (!empty($data)) {
$event = isset($data['event']) ? $data['event'] : 'message'; // 默认事件类型为 'message'
$payload = isset($data['data']) ? $data['data'] : $data; // 数据部分
echo "event: $event\n";
echo "data: " . json_encode($payload) . "\n\n";
}
// 发送心跳检测(注释行)
if ($enableHeartbeat && time() - $lastHeartbeat >= $heartbeatInterval) {
echo ": heartbeat\n\n"; // 心跳检测
$lastHeartbeat = time();
}
// 刷新输出缓冲区
ob_flush();
flush();
// 等待指定间隔
usleep($interval * 100000); // 使用微秒级睡眠,更精确
} catch (\Exception $e) {
// 发送错误信息
echo "event: error\ndata: " . json_encode(['message' => $e->getMessage()]) . "\n\n";
ob_flush();
flush();
break; // 退出循环
}
}
// 关闭连接时清理
echo "event: close\ndata: Connection closed by server\n\n";
ob_flush();
flush();
usleep(100000); // 延迟 100 毫秒,确保消息发送完毕
exit();
}
/**
* 示例回调函数,用于生成数据
*
* @return array
*/
function generateData()
{
static $counter = 0;
$counter++;
return [
'event' => 'update', // 自定义事件类型
'data' => [
'time' => date('Y-m-d H:i:s'),
'counter' => $counter,
'message' => 'This is a message from the server.'
]
];
}
// 使用 SSE 函数
try {
sse('generateData', 1, 55); // 每1秒发送一次数据,每55秒发送一次心跳,最大运行时间300秒
} catch (\Exception $e) {
error_log('SSE Error: ' . $e->getMessage());
}
- sse_client.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE Client</title>
<style>
#messages {
height: 300px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
margin-top: 10px;
}
</style>
</head>
<body>
<h1>SSE Client</h1>
<div id="messages"></div>
<script>
// 创建 EventSource 对象,连接到 SSE 服务端
const eventSource = new EventSource('sse_server.php');
// 添加消息到页面
function addMessage(message) {
const p = document.createElement('p');
p.textContent = message;
const messagesDiv = document.getElementById('messages');
messagesDiv.appendChild(p);
// 自动滚动到底部
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
// 监听 message 事件
eventSource.addEventListener('message', function(event) {
try {
const data = JSON.parse(event.data); // 解析服务器发送的数据
if (data && data.time && data.counter !== undefined && data.message) {
const message = `[${data.time}] Counter: ${data.counter}, Message: ${data.message}`;
addMessage(message);
}
} catch (error) {
console.error('Failed to parse event data:', error);
}
});
// 监听 heartbeat 事件
eventSource.addEventListener('heartbeat', function(event) {
console.log('Heartbeat received:', event.data);
addMessage('[HEARTBEAT] Server is alive.');
});
// 监听服务器关闭事件
eventSource.addEventListener('close', function(event) {
console.log('Server closed the connection.');
addMessage('[CLOSE] Connection closed by server.');
eventSource.close(); // 关闭连接
});
// 监听错误事件
eventSource.onerror = function(event) {
if (event.eventPhase === EventSource.CLOSED) {
console.log('SSE connection closed by server.');
} else {
console.error('SSE connection error:', event);
}
eventSource.close(); // 关闭连接
};
</script>
</body>
</html>