TCP服务器
//创建Server对象,监听 127.0.0.1:9501端口
$serv = new swoole_server("127.0.0.1", 9501);
//监听连接进入事件
$serv->on('connect', function ($serv, $fd) {
echo "Client: Connect.\n";
});
//监听数据接收事件
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
$serv->send($fd, "Server: ".$data);
});
//监听连接关闭事件
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
//启动服务器
$serv->start();
UDP服务器
//创建Server对象,监听 127.0.0.1:9502端口,类型为SWOOLE_SOCK_UDP
$serv = new swoole_server("127.0.0.1", 9502, SWOOLE_PROCESS, SWOOLE_SOCK_UDP);
//监听数据接收事件
$serv->on('Packet', function ($serv, $data, $clientInfo) {
$serv->sendto($clientInfo['address'], $clientInfo['port'], "Server ".$data);
var_dump($clientInfo);
});
//启动服务器
$serv->start();
http服务器
$http = new swoole_http_server("0.0.0.0", 9501);
$http->on('request', function ($request, $response) {
var_dump($request->get, $request->post);
$response->header("Content-Type", "text/html; charset=utf-8");
$response->end("<h1>Hello Swoole. #".rand(1000, 9999)."</h1>");
});
$http->start();
websocket服务器
服务器端:
//创建websocket服务器对象,监听0.0.0.0:9502端口
$ws = new swoole_websocket_server(“0.0.0.0”, 9502);
//监听WebSocket连接打开事件
$ws->on('open', function ($ws, $request) {
var_dump($request->fd, $request->get, $request->server);
$ws->push($request->fd, "hello, welcome\n");
});
//监听WebSocket消息事件
$ws->on('message', function ($ws, $frame) {
echo "Message: {$frame->data}\n";
$ws->push($frame->fd, "server: {$frame->data}");
});
//监听WebSocket连接关闭事件
$ws->on('close', function ($ws, $fd) {
echo "client-{$fd} is closed\n";
});
$ws->start();
客户端JS:
var wsServer = 'ws://192.168.50.151:9502';
var websocket = new WebSocket(wsServer);
websocket.onopen = function (evt) {
console.log("Connected to WebSocket server.");
};
websocket.onclose = function (evt) {
console.log("Disconnected");
};
websocket.onmessage = function (evt) {
console.log('Retrieved data from server: ' + evt.data);
};
websocket.onerror = function (evt, e) {
console.log('Error occured: ' + evt.data);
};
辅助
定时器
//每隔2000ms触发一次
swoole_timer_tick(2000, function ($timer_id) {
echo "tick-2000ms\n";
});
//3000ms后执行此函数
swoole_timer_after(3000, function () {
echo "after 3000ms.\n";
});
异步tcp服务器处理任务
$serv = new swoole_server("127.0.0.1", 9501);
//设置异步任务的工作进程数量
$serv->set(array('task_worker_num' => 4));
$serv->on('receive', function($serv, $fd, $from_id, $data) {
//投递异步任务
$task_id = $serv->task($data);
echo "Dispath AsyncTask: id=$task_id\n";
});
//处理异步任务
$serv->on('task', function ($serv, $task_id, $from_id, $data) {
echo "New AsyncTask[id=$task_id]".PHP_EOL;
//返回任务执行的结果
$serv->finish("$data -> OK");
});
//处理异步任务的结果
$serv->on('finish', function ($serv, $task_id, $data) {
echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;
});
$serv->start();
tcp同步客户端
$client = new swoole_client(SWOOLE_SOCK_TCP);
//连接到服务器
if (!$client->connect('127.0.0.1', 9501, 0.5))
{
die("connect failed.");
}
//向服务器发送数据
if (!$client->send("hello world"))
{
die("send failed.");
}
//从服务器接收数据
$data = $client->recv();
if (!$data)
{
die("recv failed.");
}
echo $data;
//关闭连接
$client->close();
tcp异步客户端
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
//注册连接成功回调
$client->on("connect", function($cli) {
$cli->send("hello world\n");
});
//注册数据接收回调
$client->on("receive", function($cli, $data){
echo "Received: ".$data."\n";
});
//注册连接失败回调
$client->on("error", function($cli){
echo "Connect failed\n";
});
//注册连接关闭回调
$client->on("close", function($cli){
echo "Connection close\n";
});
//发起连接
$client->connect('127.0.0.1', 9501, 0.5);
自定义通讯协议设计
进程/协程管理
进程
单独进程
$process = new swoole_process('callback_function', true);
$pid = $process->start();
function callback_function(swoole_process $worker){
$worker->exec('/usr/bin/php', array(__DIR__.'/write_file.php'));
}// 启用本地的命令,加上绝对路径
swoole_process::wait();
【子进程】管道缓冲区读写及事件监听?
$workers = [];
$worker_num = 3;//创建的进程数
for($i=0;$i<$worker_num; $i++){
$process = new swoole_process('process');// 创建子进程 启动函数为 process
$pid = $process->start();// 子进程启动
$workers[$pid] = $process;// 存入字符数组
}
foreach($workers as $process){
//子进程会包含此事件,加入到子进程中 异步IO
swoole_event_add($process->pipe, function ($pipe) use($process){
$data = $process->read();
echo "RECV: " . $data.PHP_EOL;// 接收数据
});//函数调用的神奇情况
}
function process(swoole_process $process){// 第一个处理
$process->write("processId:".$process->pid);// 子进程写入信息到管道。
echo "echo:".$process->pid."\t".$process->callback .PHP_EOL;// 打印提示信息结果 及制表符
}
// 两次等待子进程结束
for($i=0; $i<$worker_num; $i++){// 回收子进程 否则出现僵尸进程
$ret = swoole_process::wait();// 回收结束运行的子进程,如果子进程结束。类似于 join
$pid = $ret['pid'];
unset($workers[$pid]);
echo "子进程退出, PID=".$pid.PHP_EOL;
}
队列读写
// 进程通信
$workers = [];// 进程仓库
$worker_num = 2;// 最大进程数
// 循环创建子进程
for($i = 0; $i < $worker_num; $i++){
$process = new swoole_process('callback_function', false, false);
$process->useQueue();// 开启队列使用,类似于全局队列
$pid = $process->start();//开启进程
$workers[$pid] = $process;// 存入句柄仓库
}
// 子进程执行函数
function callback_function(swoole_process $worker){
sleep(2);//睡觉2秒
$recv = $worker->pop();// 获取队列数据
echo "从主进程获取数据: $recv\n";
$worker->exit(0);// 当前子进程结束
}
// 主进程内,新增队列数据
foreach($workers as $pid => $process){
$process->push("Hello 子进程[$pid]\n");
}
// 两次等待子进程结束
for($i = 0; $i < $worker_num; $i++){
// 回收子进程 否则出现僵尸进程
$ret = swoole_process::wait();// 回收结束运行的子进程,如果子进程结束。类似于 join
$pid = $ret['pid'];
unset($workers[$pid]);
echo "子进程退出, PID=".$pid.PHP_EOL;
}
循环触发进程
// 循环定时执行
// 定时器触发函数
swoole_process::signal(SIGALRM, function () {
static $i = 0;
echo "#{$i}\t alarm\n";
$i++;
if ($i > 20) {
swoole_process::alarm(-1);// 清除定时器
}
});
//100ms
swoole_process::alarm(100 * 1000);// 定时器,类似于定时触发,类似js 里面的setinterval()
协程
2.0开始执行,暂不深入了解
内存读写
内存锁—互斥锁
$lock = new swoole_lock(SWOOLE_MUTEX);
echo "[主进程]创建锁\n";
$lock->lock();
if (pcntl_fork() > 0){// 这个是主进程
sleep(1);
$lock->unlock();
}else{// 这个是子进程
echo "[子进程]等待锁\n";
$lock->lock();
echo "[子进程]获取锁\n";
$lock->unlock();
exit("[子进程]退出\n");
}
echo "[主进程]释放锁\n";
unset($lock);
sleep(1);
echo "[主进程]退出\n";
异步IO
DNS轮询
swoole_async_dns_lookup("www.baidu.com", function($host, $ip){
echo "{$host} : {$ip}\n";
});
异步读取
swoole_async_readfile(__DIR__."/server.php", function($filename, $content) {
echo "$filename: $content";
});
异步写入
$file_content = 'jingshan';
swoole_async_writefile('test.log', $file_content, function($filename) {
echo "wirte ok.\n";
}, $flags = 0);
异步事件
$fp = stream_socket_client("tcp://www.qq.com:80", $errno, $errstr, 30);
fwrite($fp,"GET / HTTP/1.1\r\nHost: www.qq.com\r\n\r\n");
swoole_event_add($fp, function($fp) {
$resp = fread($fp, 8192);
//socket处理完成后,从epoll事件中移除socket
//var_dump($resp);
swoole_event_del($fp);
fclose($fp);
});
echo "Finish\n"; //swoole_event_add不会阻塞进程,这行代码会顺序执行
异步mysql
// mysql异步客户端
$db = new swoole_mysql;
$server = array(
'host' => '192.168.50.145',
'user' => 'root',
'password' => 'flzx_3QC',
'database' => 'mysql',
'chatset' => 'utf8', //指定字符集
);
$db->connect($server, function ($db, $r) {
if ($r === false){
var_dump($db->connect_errno, $db->connect_error);
die;
}
$sql = 'show tables';
$db->query($sql, function(swoole_mysql $db, $r) {
global $s;
if ($r === false){
var_dump($db->error, $db->errno);
}
elseif ($r === true ){
var_dump($db->affected_rows, $db->insert_id);
}
var_dump($r);
$db->close();
});
});