RPC框架-yar的简单使用和原理分析

Yar(yet another RPC framework)

  • 作者是惠新宸,代表作有Yaf (Yet another framework),Yar(Yet another RPC framework) 以及Yac(Yet another Cache)、Taint等多个开源项目

  • 是一个轻量级的并行的RPC框架,也是一个PHP扩展

    • RPC(Remote Procedure Call Protocol)——远程过程调用协议,采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
  • 使用场景

    • 传统的Web应用, 随着业务快速增长, 开发人员的流转,代码量会越来越大. 随着系统变复杂,就会出现 牵一发就会动全局的局面, 而新来的维护者,需要投入更多的时间成本去维护,长此以往,最后等待我们的就是代码重构。
    • 那么此时,我们可以通过模块分离,解耦的方式把复杂的业务进行各种分模块,分别部署在不同机器上,或者将公共业务逻辑抽离出来,将之组成独立的服务Service应用 。
    • 基于此, Yar的功能就显现出来了
  • yar的安装和简单使用 (git 地址:https://github.com/laruence/yar

## Requirement
根据官方要求,需要安装msgpack 和yar的PHP扩展,扩展安装时,应注意与PHP版本的兼容问题。

作者鸟哥的网站介绍:http://www.laruence.com/2012/09/15/2779.html

PHP官网介绍:http://php.net/manual/zh/book.yar.php

github官网介绍:https://github.com/laruence/yar

msgpack(二进制数据传输,配合yar用):http://pecl.php.net/package/msgpack yar下载地址:http://pecl.php.net/package/yar
- PHP 5.2+
- Curl
- Json
- Msgpack (Optional)
安装要求,要求安装一下的php扩展,这就是上一步的重要性!
pecl install yar
pecl install msgpack

里面涉及一些docker镜像的载入导出https://www.jb51.net/article/151949.htm
和容器的载入导出https://www.cnblogs.com/linjiqin/p/8608975.html

导入保存的镜像包文件  cat server.tar | docker import - guofu/server:v1.0
启动容器 sudo docker run -idt --name webserver2 -p 8881:80 -v /home/wwwroot/default/test/:/home/wwwroot/default/test/ 0fb4edb46ffc /bin/bash

  • 简单使用
    • 原生server端
class API {

/**

* the doc info will be generated automatically into service info page.

* @params

* @return

*/

public function foo1($parameter) {

return 'this parameter is'.$parameter;

}

public function foo2() {

return 'this is no parameter';

}


$service = new Yar_Server(new API());

$service->handle();// 启动HTTP RPC Server

  • 原生 Client端
$client = new Yar_Client("http://192.168.1.131:8888"); // 如果有命名空间,则采用$client = new Yar_Client("http://host/api/");

/* the following setopt is optinal */

//Set timeout to 1s
$client->SetOpt(YAR_OPT_CONNECT_TIMEOUT, 1000);

//Set packager to JSON
$client->SetOpt(YAR_OPT_PACKAGER, "json");

/* call remote service */

$result = $client->foo2("parameter");
  • 并行调用
    • 串行程序:
      只能被顺序执行的指令列表
    • 并行程序
      可以在并行的硬件上执行的并发程序。
//服务端代码
<?php


class Operator
{

    /**
     * Add two operands
     * @param interge
     * @return interge
     */
    public function add($a, $b)
    {
        sleep(3);
        return $this->_add($a, $b);

    }

    /**
     * Sub
     */
    public function sub($a, $b)
    {
        sleep(3);
        return $a - $b;
    }

    /**
     * Mul
     */
    public function mul($a, $b)
    {
        sleep(3);
        return $a * $b;
    }

    /**
     * Protected methods will not be exposed
     * @param interge
     * @return interge
     */
    protected function _add($a, $b)
    {
        return $a + $b;
    }
}

$server = new Yar_Server(new Operator());
$server->handle();

//客户端代码
<?php
function callback($retval, $callinfo) {

        echo "这是一个远程调用的返回, 调用的服务名是", $callinfo["method"],
        ". 调用的sequence是 " , $callinfo["sequence"] , "\n";
        var_dump($retval);
    
}

function add($ret, $callinfo) {
    echo "add";
//sequence  序列    //uri 调用的rpc  // method  //调用的方法   //$ret 返回的结果
    echo $callinfo['method'] , " result: ", $ret , "\n";
}


function sub($ret, $callinfo) {
    echo "sub";
//sequence  序列    //uri 调用的rpc  // method  //调用的方法   //$ret 返回的结果
    echo $callinfo['method'] , " result: ", $ret , "\n";
}


function mul($ret, $callinfo) {
    echo "mul";
//sequence  序列    //uri 调用的rpc  // method  //调用的方法   //$ret 返回的结果
    echo $callinfo['method'] , " result: ", $ret , "\n";
}



// 注册一个异步调用 //   调用地址----api名称----参数--回调函数的名字
//并行的(异步的)远程服务调用, 不过这个调用请求不会被立即发出, 调用loop的时候才真正发送出去
/*
 * uri
RPC 服务的 URI(http 或 tcp).

method
调用的服务名字(也就是服务方法名).

parameters
调用的参数.

callback
回调函数, 在远程服务的返回到达的时候被Yar调用, 从而可以处理返回内容.
 *
 * */
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "add", array(1, 2),"add");
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "sub", array(2, 1), "sub");
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "mul", array(2, 2), "mul");

// 发送所有注册的调用,等待返回,返回后 Yar 会调用 callback 回掉函数 //
//public static Yar_Concurrent_Client::loop ([ callable $callback [, callable $error_callback ]] )
Yar_Concurrent_Client::loop();

// 重置 call , 否则上面的 call 会调用 //
/*
Yar_Concurrent_Client::reset();
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "sub", array(6, 7), "callback");
Yar_Concurrent_Client::loop();
*/

  • 异常处理
//Server.php
<?php
class Custom_Exception extends Exception {};

class API {
    public function throw_exception($name) {
        throw new Custom_Exception($name);
    }
}

$service = new Yar_Server(new API());
$service->handle();
?>

//Client.php
<?php
$client = new Yar_Client("http://host/api.php");

try {
    $client->throw_exception("client");
} catch (Yar_Server_Exception $e) {
    var_dump($e->getType());
    var_dump($e->getMessage());
}

  • 在laravel中的使用
  • 原理
    Yar 远程调用的实现原理
    实际呢,yar client 是通过__call这个魔术方法来实现远程调用的,在Yar_client类里面并没有任何方法,当我们在调用一个不存在的方式的时候,就会执行__call方法,这个在框架中非常常见。

Client需要远程调用的时候,先初始化数据,数据主要包括三个部分header, packager_name,request_body,然后根据配置中的方式从pack_list选择合适的序列化方式(msgpack,json,php),对request_body进行序列化。 然后进行传输,传输方式 同样是采用工厂方法,从已有的方式中选择Curl/socket方式进行传输数据,传输数据的过程实际就是发送一个http请求。

服务器端监听端口,底层网络实现都利用现有的nginx, php-fpm。在PHP代码中,实现的地方实例化一个类,然后根据request的内容,解析body,然后去初始化header, 获得packager_name,根据packer_name解析yar_body的内容。

  • Yar 协议分析
    以下是每次request和response的数据情况。以request为例传输的body数据包含三个部分,82byte的头部,packager_name,和经过packager_name序列化的string。当这段数据传到server端,按照固定的结构解析出来即可。


    image.png

在 yar 中规定的传输协议如下图所示,请求体为82个字节的yar_header_t和8字节的打包名称和请求实体yar_request_t,在yar_header_t里面用body_len记录8字节的打包名称+请求实体的长度;返回体类似,只是实体内容的结构体稍微不同,在reval里面才是实际最后客户端需要的结果。

整个传输以二进制流的形式传送。

传输

yar目前有两种传输方式,这个在之前提到过curl和socket方式。因为socket实际没有用过所以这里主要介绍curl。

curl的模块主要以来底层的CURL模块,主要封装了如下方法。其中multi系列主要是解决并行请求的方案。

image.png

php_yar_curl_open

这个方法主要是用CURL创建一个连接对象。这里有个复用的机制,当options &

YAR_PROTOCOL_PERSISTENT(0x1) 为true的时候,会去已经使用的连接中找一下,如果存在并且没有在用则复用。

php_yar_curl_send

这个函数主要是把request需要的数据,转成字符串,然后放在请求的postfield里面。

php_yar_curl_exec

将postfield内容,通过http请求发送出去。获得返回的内容。然后按照上述内容解析出结果。

RPC 采用客户端/服务器模式。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
https://segmentfault.com/a/1190000010158190

https://blog.51cto.com/9681602/2287566

附录:打包器类型讲解:
如前问所述,packager目前的实现有3种:

msgpack
php
json

// packager\php.c
int php_yar_packager_php_pack(const yar_packager_t *self, zval *pzval, smart_str *buf, char **msg) /* {{{ */ {
    php_serialize_data_t var_hash;

    PHP_VAR_SERIALIZE_INIT(var_hash);
    php_var_serialize(buf, pzval, &var_hash);
    PHP_VAR_SERIALIZE_DESTROY(var_hash);

    return 1;
} /* }}} */

zval * php_yar_packager_php_unpack(const yar_packager_t *self, char *content, size_t len, char **msg, zval *rret) /* {{{ */ {
    zval *return_value;
    const unsigned char *p;
    php_unserialize_data_t var_hash;
    p = (const unsigned char*)content;

    PHP_VAR_UNSERIALIZE_INIT(var_hash);
    if (!php_var_unserialize(rret, &p, p + len,  &var_hash)) {
        zval_ptr_dtor(rret);
        PHP_VAR_UNSERIALIZE_DESTROY(var_hash);
        spprintf(msg, 0, "unpack error at offset %ld of %ld bytes", (long)((char*)p - content), len);
        return NULL;
    }
    PHP_VAR_UNSERIALIZE_DESTROY(var_hash);

    return_value = rret;
    return return_value;
} /* }}} */

该方式实际上就是就是我们平常使用的serialize(),unserialize函数。
json
//packager\json.c
int php_yar_packager_json_pack(const yar_packager_t *self, zval *pzval, smart_str *buf, char **msg) /* {{{ */ {
#if ((PHP_MAJOR_VERSION == 5) && (PHP_MINOR_VERSION < 3))
    php_json_encode(buf, pzval);
#else
    php_json_encode(buf, pzval, 0); /* options */
#endif

    return 1;
} /* }}} */

zval * php_yar_packager_json_unpack(const yar_packager_t *self, char *content, size_t len, char **msg, zval *rret) /* {{{ */ {
    zval *return_value;

    php_json_decode(rret, content, len, 1, 512);

    return_value = rret;
    return return_value;
} /* }}} */
json方式使用json拓展的json_encode(),json_decode()对变量进行序列化操作

msgpack
//packager\msgpack.c
int php_yar_packager_msgpack_pack(const yar_packager_t *self, zval *pzval, smart_str *buf, char **msg) /* {{{ */ {
    php_msgpack_serialize(buf, pzval);
    return 1;
} /* }}} */

zval * php_yar_packager_msgpack_unpack(const yar_packager_t *self, char *content, size_t len, char **msg, zval *rret) /* {{{ */ {
    zval *return_value;
    ZVAL_NULL(rret);
    php_msgpack_unserialize(rret, content, len);
    return_value = rret;
    return return_value;
} /* }}} */
而yar_packager_msgpack使用msgpack拓展的php_msgpack_serialize()和php_msgpack_unserialize()函数对变量进行序列化。3种序列化方式中该种传输效率最高,空间使用最小, 使用该方式需要自行安装msgpack-php拓展并在编译yar时候使用--enable-msgpack参数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容