swoole 多进程共享数据

进程作为程序执行过程中资源分配的基本单位,拥有独立的地址空间,同一进程的线程可以共享本进程的全局变量,静态变量等数据和地址空间,但进程之间资源相互独立.由于PHP语言不支持多线程,因此Swoole使用多进程模式,再多进程模式下就存在进程内存隔离,进程间通信与数据共享问题.

swoole中master主进程会创建manager管理进程和reactor线程,真正的工作进程为worker进程. manager是创建和管理worker进程,reactor进程测试监听socket,接受数据任务,发送给worker进程去工作,因此所有业务逻辑最终都是在worker进程中进行的,worker进程之间的数据共享与通信必不可少.

swoole中 设置选项worker_num设置 启动的worker进程数,默认设置为CPU核数

 $server = new swoole_server('127.0.0.1',9898);
 $server->set(array(
     'worker_num' => 4,   //设置启动的Worker进程数。
 ));

如上面说描述,进程存在进程隔离:

 $fds = array();
 $server->on('connect', function ($server, $fd){
     echo "connection open: {$fd}\n";
     global $fds;
     $fds[] = $fd;
     var_dump($fds);
 });

$fds虽然是全局变量,但是只在但前的进程内有效,swoole服务器底层会创建多个worker进程,此处打印出来的只有部分连接的fd,本文讲简述两种解决方案的简单示例:

1.外部存储服务 : Redis

作为内存数据库redis 无太多IO等待,并且读写速度快

示例代码:以简易聊天室websocket服务 swoole_websocket_server为例

$ws = new swoole_websocket_server("0.0.0.0", 9999);
    $redis = new \Redis();
    $redis->connect('127.0.0.1', 6379);
    $ws->set(array(
        'daemonize' => true,
        'worker_num'      => 1,
    ));
//监听WebSocket连接打开事件
    $ws->on('open', function ($ws, $request) use($redis) {
        var_dump($request->fd, $request->get, $request->server);
        //记录连接
        $redis->sAdd('fd',$request->fd);
        $count = $redis->sCard('fd');
        $ws->push($request->fd, 'hello, welcome ☺                     当前'.$count.'人连接在线');
    });
//监听WebSocket消息事件
    $ws->on('message', function ($ws, $frame) use($redis) {
        $fds  = $redis->sMembers('fd');
        $data = json_decode($frame->data,true);
        if($data['type'] ==1 ){
            $redis->setex($frame->fd,'7200',json_encode(['fd'=>$frame->fd,'user'=>$data['user']]));
            //通知所有用户新用户上线
            $fds = $redis->sMembers('fd');$users=[];
            $i=0;
            foreach ($fds as $fd_on){
                $info = $redis->get($fd_on);
                $is_time = $redis->ttl($fd_on);
                if($is_time > 0){
                    $users[$i]['fd']   = $fd_on;
                    $users[$i]['name'] = json_decode($info,true)['user'];
//                    $users[$i]['name'] = $is_time;
                }else{
                    $redis->sRem('fd',$fd_on);
                }
                $i++;
            }
            foreach ($fds as $fd_on){
                $message = date('Y-m-d H:i:s',time())."<br>欢迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 进入聊天室<br>";
                $push_data = ['message'=>$message,'users'=>$users];
                $ws->push($fd_on,json_encode($push_data));
                $i++;
            }
        }else if($data['type'] ==2){
            if($data['to_user'] == 'all'){
                foreach ($fds as $fd){
                    if($frame->fd == $fd){
                        $message = "<p style='text-align: right'>".date('Y-m-d H:i:s',time())."<br><b style='color:blue;'> 寡人say:</b>  ".$data['msg']."<br></p>";
                    }else{
                        $message = date('Y-m-d H:i:s',time())."<br><b style='color: crimson'>".$data['from_user']." say:</b>  ".$data['msg']."<br>";
                    }
                    $push_data = ['message'=>$message];
                    $ws->push($fd,json_encode($push_data));
                }
            }
        }
        echo "Message: {$frame->data}\n";
        //循环所有连接人发送内容
        //foreach($ws->connections as $key => $fd) {
        //$user_message = $frame->data;
        //$ws->push($fd, $frame->fd.'say:'.$user_message);
        //}
    });
//监听WebSocket连接关闭事件
    $ws->on('close', function ($ws, $fd) use ($redis){
        $redis->sRem('fd',$fd);
        $fds = $redis->sMembers('fd');
        $i=0;$users=[];
        foreach ($fds as $fd_on){
            $info = $redis->get($fd_on);
            $is_time = $redis->ttl($fd_on);
            if($is_time){
                $users[$i]['fd']   = $fd_on;
                $users[$i]['name'] = json_decode($info,true)['user'];
            }else{
                $redis->sRem('fd',$fd_on);
            }
            $i++;
        }
        foreach ($fds as $fd_on){
            $user = json_decode($redis->get($fd),true)['user'];
            $message = date('Y-m-d H:i:s',time())."<br><b style='color: blueviolet'>".$user."</b> 离开聊天室了<br>";
            $push_data = ['message'=>$message,'users'=>$users];
            $ws->push($fd_on,json_encode($push_data));
        }
        echo "client-{$fd} is closed\n";
    });
    $ws->start();

2.共享内存拓展:swoole_table

swoole_table是swoole官方提供的基于共享内存和锁实现的超高性能冰饭数据结构.swoole_table在swoole1.7.5版本后可用.

目前swoole只支持3种类型:

swoole_table::TYPE_INT 整形字段

swoole_table::TYPE_FLOAT浮点字段

swoole_table::TYPE_STRING 字符串字段

函数方法:

column() :给内存表增加一列 参数:字段名,字段类型,字节数

$table->column('id', swoole_table::TYPE_INT, 4);

create():基于前一步对表结构的创建,执行创建表.

set() :设置行的数据(key-value的方式) 参数: 数据的key,数据的值(必须数组,键名必须与字段定义的$name相同)

$table->set($fd, ['id'=>1]);

get() :获取一行数据 参数:数据的key

$table->get($fd);

del() :删除一行数据 参数:数据的key

$table->del($fd);

lock():锁定整个表

unlock():释放锁

lock/unlock 必须成对出现,否则会发生死锁.

示例代码: 还是上面的websocket服务为例

class WebSocketServer {
    private $server;
    public function __construct()
    {
        $this->server = new swoole_websocket_server("0.0.0.0",9988);
        $this->server->set(array(
           'daemonize'       => true,
            'worker_num'      => 4,
            'task_worker_num' => 4
        ));
        $fd_table = new swoole_table( 1024 );
        $fd_table->column( "user",swoole_table::TYPE_STRING, 30 );
        $fd_table->column( "time", swoole_table::TYPE_STRING, 20 );
        $fd_table->create();
        $user_table = new swoole_table(1024);
        $user_table->column("fd",swoole_table::TYPE_INT,8);
        $user_table->create();
        $this->server->fd = $fd_table;
        $this->server->user = $user_table;
        //启动开始
        $this->server->on('Start',[$this,'onStart']);
        //与onStart同级
        $this->server->on('workerStart',[$this,'onWorkerStart']);
        //webSocket open 连接触发回调
        $this->server->on('open',[$this,'onOpen']);
        //webSocket send 发送触发回调
        $this->server->on('message', [$this, 'onMessage']);
        //webSocket close 关闭触发回调
        $this->server->on('Close', [$this, 'onClose']);
        //tcp连接 触发 在 webSocket open 之前回调
        $this->server->on('Connect', [$this, 'onConnect']);
        //tcp 模式下(eg:telnet ) 发送信息才会触发  webSocket 模式下没有触发
        $this->server->on('Receive', [$this, 'onReceive']);
        // task_worker进程处理任务的回调   处理比较耗时的任务
        $this->server->on('Task', [$this, 'onTask']);
        // task_worker进程处理任务结束的回调
        $this->server->on('Finish', [$this, 'onFinish']);
        // 服务开启
        $this->server->start();
    }
    public function createTable(){
    }
    public function onStart( $server)
    {
        echo "Start\n";
    }
    public function onWorkerStart($server,$worker_id)
    {
        //判断是worker进程还是 task_worker进程 echo 次数 是worker_num+task_worker_num
        if($worker_id<$server->setting['worker_num']){
            echo  'worder'.$worker_id."\n";
        }else{
            echo  'task_worker'.$worker_id."\n";
        }
        //     echo "workerStart{$worker_id}\n";
    }
    public function onOpen( $server,$request)
    {
        $this->server->fd->set($request->fd,['user'=>'']);
        echo "server: handshake success with fd{$request->fd}\n";
        $count =count($server->connections);
        $server->push($request->fd, 'hello, welcome ☺                     当前'.$count.'人连接在线');
    }
    public function onMessage( $server,$frame)
    {
        echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
        $data = json_decode($frame->data,true);
        if($data['type'] ==1 ){
            $server->fd->set($frame->fd,['user'=>$data['user']]);
            //通知所有用户新用户上线
            foreach($server->connections as $key => $fd) {
                $server->push($fd, "欢迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 进入聊天室");
            }
        }else if($data['type'] ==2){
            if($data['to_user'] == 'all'){
                foreach($server->connections as $key => $fd) {
                    $server->push($fd, "<b style='color: crimson'>".$data['from_user']." say:</b>  ".$data['msg']);
                }
            }
        }
    }
    public function onConnect( $server, $fd, $from_id ) {
        echo "Client {$fd} connect\n";
        echo "{$from_id}\n";
    }
    public function onReceive( $server, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // send a task to task worker.
//        $param = array(
//            'fd' => $fd
//        );
//        $server->task( json_encode( $param ) );
        echo "Continue Handle Worker\n";
    }
    public function onClose($server, $fd)
    {
        echo "Client {$fd} close connection\n";
        foreach($server->connections as $key => $on_fd) {
            $user = $server->fd->get($fd)['user'];
            $server->push($on_fd, "<b style='color: blueviolet'>".$user."</b> 离开聊天室了");
        }
    }
    public function onTask($server, $task_id, $from_id, $data)
    {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for ($i = 0; $i < 10; $i++) {
            sleep(1);
            echo "Taks {$task_id} Handle {$i} times...\n";
        }
        $fd = json_decode($data, true)['fd'];
        echo  "Data in Task {$task_id}";
//        $serv->send($fd, "Data in Task {$task_id}");
        return "Task {$task_id}'s result";
    }
    public function onFinish($server,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
}
new WebSocketServer();
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容