服务器的演进
1)单进程阻塞的网络服务器
说明:
1.创建一个socket,绑定服务器端口(bind),监听端口(listen),在PHP中用stream_socket_server一个函数就能完成上面3个步骤
2.进入while循环,阻塞在accept操作上,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起connect到服务器,操作系统会唤醒此进程。accept函数返回客户端连接的socket
1.利用fread读取客户端socket当中的数据收到数据后服务器程序进行处理然后使用fwrite向客户端发送响应。长连接的服务会持续与客户端交互,而短连接服务一般收到响应就会close。
缺点:
一次只能处理一个连接,不支持多个连接同时处理
每个连接进入到我们的服务端的时候,单独创建一个进程/线程提供服务
简单实现
单进程阻塞的网络服务器
1 创建一个socket,绑定服务器端口(bind),监听端口(listen),在PHP中用stream_socket_server一个函数就能完成上面3个步骤
2 进入while循环,阻塞在accept操作上,等待客户端连接进入。此时程序会进入睡眠状态,直到有新的客户端发起connect到服务器,操作系统会唤醒此进程。
accept函数返回客户端连接的socket
3 利用fread读取客户端socket当中的数据收到数据后服务器程序进行处理然后使用fwrite向客户端发送响应。
长连接的服务会持续与客户端交互,而短连接服务一般收到响应就会close。
缺点:
1 一次只能处理一个连接,不支持多个连接同时处理
2 每个连接进入到我们的服务端的时候,单独创建一个进程/线程提供服务
class Worker{
protected $socket = null;
public $onMessage = null;
public $onConnect = null;
public function __construct($socket_address)
{
//绑定地址监听端口
$this->socket = stream_socket_server($socket_address);
}
public function start(){
while (true){
//阻塞监听客户端socket状态如连接成功发送的消息等,调用相应回调 直到返回
$clientSocket = stream_socket_accept($this->socket);//返回客户端资源
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//从连接当中读取客户端的内容
$buffer=fread($clientSocket,65535);
//正常读取到数据,触发消息接收事件,响应内容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$clientSocket,$buffer);
}
//如果不关闭连接不能支持大点的并发请求
//fclose($clientSocket);
}
}
}
$server = new Worker('tcp://0.0.0.0:9800');
//客户端连接成功触发
$server->onConnect = function ($fd){
echo '连接事件触发',(int)$fd,PHP_EOL;
};
//客户端端发消息过来触发
$server->onMessage = function ($conn, $message){
//事件回调当中写业务逻辑
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
// $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$server->start(); //启动
结论:
一次只能处理一个连接,不支持多个连接同时处理
每个连接进入到我们的服务端的时候,单独创建一个进程/线程提供服务
1)预派生子进程模式 类似php-fpm
1、程序启动后就会创建N个进程。每个子进程进入 Accept,等待新的连接进入。当客户端连接到服务器时,其中一个子进程会被唤醒,开始处理客户端请求,并且不再接受新的TCP连接。当此连接关闭时,子进程会释放,重新进入 Accept,参与处理新的连接。这个模型的优势是完全可以复用进程,不需要太多的上下文切换,比如php-fpm基于此模型的。
缺点:
1.这种模型严重依赖进程的数量解决并发问题,一个客户端连接就需要占用一个进程,工作进程的数量有多少,并发处理能力就有多少。操作系统可以创建的进程数量是有限的。
2、操作系统生成一个子进程需要进行内存复制等操作,在资源和时间上会产生一定的开销;当有大量请求时,会导致系统性能下降;
例如:即时聊天程序,一台服务器可能要维持数十万的连接,那么就要启动数十万的进程来维持。这显然不可能
基于上面的模式我们发现我们只能通过每次(accept)处理单个请求,没办法一次性处理多个请求?
简单实现如下
class Worker{
protected $socket = null;
public $onMessage = null;
public $onConnect = null;
public $workNum = 10;
public function __construct($socket_address)
{
//绑定地址监听端口
$this->socket = stream_socket_server($socket_address);
}
public function start() {
//获取配置文件
$this->fork(); //用来创建多个助教老师,创建多个子进程负责接收请求的
}
public function fork(){
for ($i=0;$i<$this->workNum;$i++){
$pid = pcntl_fork();//下面的代码父子进程都会执行
if ($pid<0){
exit('创建失败');
}elseif($pid>0){
// $status=0;
// $pid=pcntl_wait($status);这边会阻塞等待子进程结束后在创建进程 所以放到for 后面 等待子进程创建 执行完成 回收子进程
// echo "子进程回收了:$pid".PHP_EOL;
}else{
$this->accept();
return; //这边要return 否则子进程还会创建子进程 因为fork 在for 循环里面当然这里在阻塞监听
}
}
//放在父进程空间,结束的子进程信息,阻塞状态
$status=0;
$pid=pcntl_wait($status);
echo "子进程回收了:$pid".PHP_EOL;
}
public function accept(){
//创建多个子进程阻塞接收服务端socket
while (true){
$clientSocket=stream_socket_accept($this->socket); //阻塞监听
var_dump(posix_getpid());
//触发事件的连接的回调
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//从连接当中读取客户端的内容
$buffer=fread($clientSocket,65535);
//正常读取到数据,触发消息接收事件,响应内容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$clientSocket,$buffer);
}
fclose($clientSocket); //必须关闭,子进程不会释放不会成功拿下进入accpet
}
}
}
$server = new Worker('tcp://0.0.0.0:9800');
//客户端连接成功触发
$server->onConnect = function ($fd){
echo '连接事件触发',(int)$fd,PHP_EOL;
};
//客户端端发消息过来触发
$server->onMessage = function ($conn, $message){
//事件回调当中写业务逻辑
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
// $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$server->start(); //启动
3)单进程阻塞复用的网络服务器
说明:
服务监听流程如上
1.保存所有的socket,通过select系统调用,监听socket描述符的可读事件
2.Select会在内核空间监听一旦发现socket可读,会从内核空间传递至用户空间,在用户空间通过逻辑判断是服务端socket可读,还是客户端的socket可读
3.如果是服务端的socket可读,说明有新的客户端建立,将socket保留到监听数组当中 【第一次建立连接服务端可读】
1.如果是客户端的socket可读,说明当前已经可以去读取客户端发送过来的内容了,读取内容,然后响应给客户端。【客户端给服务器发送数据 客户端可读】
缺点:
1.select模式本身的缺点(1、循环遍历处理事件、2、内核空间传递数据的消耗)
2.单进程对于大量任务处理乏力
class Worker{
//监听socket
protected $socket = NULL;
//连接事件回调
public $onConnect = NULL;
//接收消息事件回调
public $onMessage = NULL;
public $workerNum=4; //子进程个数
public $allSocket; //存放所有socket
public function __construct($socket_address) {
//监听地址+端口
$this->socket=stream_socket_server($socket_address);
//1、stream_set_blocking 当 socket处于阻塞模式时,
//比如:网络io fread系统调用必须等待socket有数据返回,即进程因系统调用阻塞;相反若处于非阻塞模式,内核不管socket数据有没有准备好,都会立即返回给进程。
//2、stream_set_blocking 另外进程阻塞和socket阻塞不是一个概念,进程阻塞是因为系统调用所致,socket是否阻塞只是说明socket上事件是不是可以内核即刻处理。
//1、select是系统调用,必然会阻塞进程的,和socket是否阻塞并没有关系,我第2点备注了呢。
//2、这里的IO就是针对socket的网络IO,是否是阻塞的,正是你题示所问的问题。
//3、socket之所以设置成非阻塞,是为了同一个进程里可以更多的处理更多的tcp连接,这正是 select、poll 或者 epoll等多路复用模型能够处理高并发的原因所在。
stream_set_blocking($this->socket,0); //设置网络io 比如 fread 非阻塞
// 0是非阻塞,1是阻塞
//阻塞的意义是什么呢?
//某个函数读取一个网络流,当没有未读取字节的时候,程序该怎么办?
//是一直等待,直到下一个未读取的字节的出现,还是立即告诉调用者当前没有新内容?
//前者是阻塞的,后者是非阻塞的。
//
//阻塞的好处是,排除其它非正常因素,阻塞的是按顺序执行的同步的读取。
//
//借用小说里的说法就是“神刀出鞘,无血不归”。在读到新内容之前,它不会往下走,什么别的事情都不做。
//
//而非阻塞,因为不必等待内容,所以能异步的执行,现在读到读不到都没关系,执行读取操作后立刻就继续往下做别的事情。
$this->allSocket[(int)$this->socket]=$this->socket;
}
public function start() {
//获取配置文件
$this->fork();
}
public function fork(){
$this->accept();//子进程负责接收客户端请求
}
public function accept(){
//创建多个子进程阻塞接收服务端socket
while (true){
$write=$except=[];
//需要监听socket
$read=$this->allSocket;
//建议socket状态谁改变
// var_dump($read);
stream_select($read,$write,$except,60);//内核遍历循环哪些改变会阻塞 如果只有一个改变也会循环很多次的问题
//怎么区分服务端跟客户端刚启动服务没有客户端连接进来socket 没有改变 当有新的客户端连接进来当前改变的是服务端所以循环read
foreach ($read as $index=>$val){
//循环每一个改变 返回响应
//当前发生改变的是服务端,有连接进入
if($val === $this->socket){
$clientSocket=stream_socket_accept($this->socket); //阻塞监听
//触发事件的连接的回调
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
$this->allSocket[(int)$clientSocket]=$clientSocket;//先把资源放入数组 客户端可写时循环响应避免阻塞
}else{
//从连接当中读取客户端的内容
$buffer=fread($val,1024);
//如果数据为空,或者为false,不是资源类型
if(empty($buffer)){
if(feof($val) || !is_resource($val)){
//触发关闭事件
fclose($val);
unset($this->allSocket[(int)$val]);
continue;
}
}
//正常读取到数据,触发消息接收事件,响应内容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$val,$buffer);
}
}
}
}
}
}
$worker = new Worker('tcp://0.0.0.0:9800');
//连接事件
$worker->onConnect = function ($fd) {
//echo '连接事件触发',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回调当中写业务逻辑
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //连接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); //启动
4)多进程master-worker模型
1.master进程,负责处理配置文件读取,启动,终止和维护工作(worker)进程数,当woker进程退出后(异常情况下),会自动重新启动新的woker
2.worker进程的主要任务是完成具体的任务逻辑,启动端口监听,接收客户端请求、使用epoll接收请求,执行业务逻辑然后关闭连接。
是类似于nginx和workmen采用的Reactor 多进程的模式,具体差异表现为主进程中仅仅创建了监听,并没有创建 mainReactor 来“accept”连接,而是由子进程的 Reactor 来“accept”连接,通过负载均衡,一次只有一个子进程进行“accept”,子进程“accept”新连接后就放到自己的 Reactor中进行处理,不会再分配给其他子进程 区别swoole
epoll:
epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制,无需轮询。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中。
简单点来说就是当连接有I/O流事件产生的时候,epoll就会去告诉进程哪个连接有I/O流事件产生,然后进程就去处理这个事件
也就是执行回调。
class Worker{
//监听socket
protected $socket = NULL;
//连接事件回调
public $onConnect = NULL;
//接收消息事件回调
public $onMessage = NULL;
public $workerNum=4; //子进程个数
public $allSocket; //存放所有socket
public function __construct($socket_address) {
//监听地址+端口
$this->socket=stream_socket_server($socket_address);
}
public function start() {
//获取配置文件
$this->fork();
}
public function fork(){
$this->accept();//子进程负责接收客户端请求
}
public function accept(){
//第一个需要监听的事件(服务端socket的事件),一旦监听到可读事件之后会触发
swoole_event_add($this->socket,function ($fd){
$clientSocket=stream_socket_accept($fd);
//触发事件的连接的回调
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//监听客户端可读
swoole_event_add($clientSocket,function ($fd){
//从连接当中读取客户端的内容
$buffer=fread($fd,1024);
//如果数据为空,或者为false,不是资源类型
if(empty($buffer)){
if(feof($fd) || !is_resource($fd)){
//触发关闭事件
fclose($fd);
}
}
//正常读取到数据,触发消息接收事件,响应内容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$fd,$buffer);
}
});
});
echo "非阻塞";
}
}
$worker = new Worker('tcp://0.0.0.0:9805');
//连接事件
$worker->onConnect = function ($fd) {
//echo '连接事件触发',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回调当中写业务逻辑
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //连接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); //启动
高效的事件处理模式Reactor 模式 例如swoole
Reactor模型,Reactor顾名思义就是反应堆的意思,它本身不处理任何数据收发。只是可以监视一个socket句柄的事件变化。
1) 主进程/线程往epoll内核亊件中注册socket上的读就绪亊件。
2) 主进程/线程调用epoll_wait等待socket上有数据可读。
3) 当socket上有数据可读时,epoll_wait通知主进程/线程。主进程/线程则将socket可读事件放人请求队列。
4) 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求, 然后往epoll内核事件表中注册该socket上的写就绪事件。
5) 主线程调用epoll_wait等待socket可写。
6) 当socket可写时,epoll_wait通知主进程/线程将socket可写亊件放人清求队列。
7) 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写人服务器处理客户淸求
例如 swoole 中
每一个线程都有自己的用途,下面多每个线程有一个了解
1.1、MainReactor(主线程)
主线程会负责监听server socket,如果有新的连接accept,主线程会评估每个Reactor线程的连接数量。将此连接分配给连接数最少的reactor线程,做一个负载均衡。
1.2 、Reactor线程组
Reactor线程负责维护客户端机器的TCP连接、处理网络IO、收发数据完全是异步非阻塞的模式。
swoole的主线程在Accept新的连接后,会将这个连接分配给一个固定的Reactor线程,在socket可读时读取数据,并进行协议解析,将请求投递到Worker进程。在socket可写时将数据发送给TCP客户端。
nginx 类似 workermen
是类似于nginx采用的Reactor 多进程的模式,具体差异表现为主进程中仅仅创建了监听,并没有创建 mainReactor 来“accept”连接,而是由子进程的 Reactor 来“accept”连接,通过负载均衡,一次只有一个子进程进行“accept”,子进程“accept”新连接后就放到自己的 Reactor中进行处理,不会再分配给其他子进程
例
class Worker{
//监听socket
protected $socket = NULL;
//连接事件回调
public $onConnect = NULL;
public $reusePort=1;
//接收消息事件回调
public $onMessage = NULL;
public $workerNum=3; //子进程个数
public $allSocket; //存放所有socket
public $addr;
public function __construct($socket_address) {
//监听地址+端口
$this->addr=$socket_address;
}
public function start() {
//获取配置文件
$this->fork();
}
public function fork(){
for ($i=0;$i<$this->workerNum;$i++){
$pid=pcntl_fork(); //创建成功会返回子进程id
if($pid<0){
exit('创建失败');
}else if($pid>0){
//父进程空间,返回子进程id
}else{ //返回为0子进程空间
$this->accept();//子进程负责接收客户端请求
exit; //父进程继续执行循环创建子进程
}
}
//放在父进程空间,结束的子进程信息,阻塞状态
$status=0;
for ($i=0;$i<$this->workerNum;$i++) {
$pid = pcntl_wait($status);
}
}
public function accept(){
$opts = array(
'socket' => array(
'backlog' =>10240, //成功建立socket连接的等待个数
),
);
$context = stream_context_create($opts);
//开启多端口监听,并且实现负载均衡
stream_context_set_option($context,'socket','so_reuseport',1);
stream_context_set_option($context,'socket','so_reuseaddr',1);
$this->socket=stream_socket_server($this->addr,$errno,$errstr,STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context);
//第一个需要监听的事件(服务端socket的事件),一旦监听到可读事件之后会触发
swoole_event_add($this->socket,function ($fd){
$clientSocket=stream_socket_accept($fd);
//触发事件的连接的回调
if(!empty($clientSocket) && is_callable($this->onConnect)){
call_user_func($this->onConnect,$clientSocket);
}
//监听客户端可读
swoole_event_add($clientSocket,function ($fd){
//从连接当中读取客户端的内容
$buffer=fread($fd,1024);
//如果数据为空,或者为false,不是资源类型
if(empty($buffer)){
if(!is_resource($fd) || feof($fd) ){
//触发关闭事件
fclose($fd);
}
}
//正常读取到数据,触发消息接收事件,响应内容
if(!empty($buffer) && is_callable($this->onMessage)){
call_user_func($this->onMessage,$fd,$buffer);
}
});
});
}
}
$worker = new Worker('tcp://0.0.0.0:9810');
//开启多进程的端口监听
$worker->reusePort = true;
//连接事件
$worker->onConnect = function ($fd) {
//echo '连接事件触发',(int)$fd,PHP_EOL;
};
//消息接收
$worker->onMessage = function ($conn, $message) {
//事件回调当中写业务逻辑
// $a=include 'index.php';
// var_dump($a);
//var_dump($conn,$message);
$content="我是peter";
$http_resonse = "HTTP/1.1 200 OK\r\n";
$http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
$http_resonse .= "Connection: keep-alive\r\n"; //连接保持
$http_resonse .= "Server: php socket server\r\n";
$http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
$http_resonse .= $content;
fwrite($conn, $http_resonse);
};
$worker->start(); //启动