Yar(yet another RPC framework)
作者是惠新宸,代表作有Yaf (Yet another framework),Yar(Yet another RPC framework) 以及Yac(Yet another Cache)、Taint等多个开源项目
-
是一个轻量级的并行的RPC框架,也是一个PHP扩展
- RPC(Remote Procedure Call Protocol)——远程过程调用协议,采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
-
使用场景
- 传统的Web应用, 随着业务快速增长, 开发人员的流转,代码量会越来越大. 随着系统变复杂,就会出现 牵一发就会动全局的局面, 而新来的维护者,需要投入更多的时间成本去维护,长此以往,最后等待我们的就是代码重构。
- 那么此时,我们可以通过模块分离,解耦的方式把复杂的业务进行各种分模块,分别部署在不同机器上,或者将公共业务逻辑抽离出来,将之组成独立的服务Service应用 。
- 基于此, Yar的功能就显现出来了
yar的安装和简单使用 (git 地址:https://github.com/laruence/yar
)
## Requirement
根据官方要求,需要安装msgpack 和yar的PHP扩展,扩展安装时,应注意与PHP版本的兼容问题。
作者鸟哥的网站介绍:http://www.laruence.com/2012/09/15/2779.html
PHP官网介绍:http://php.net/manual/zh/book.yar.php
github官网介绍:https://github.com/laruence/yar
msgpack(二进制数据传输,配合yar用):http://pecl.php.net/package/msgpack yar下载地址:http://pecl.php.net/package/yar
- PHP 5.2+
- Curl
- Json
- Msgpack (Optional)
安装要求,要求安装一下的php扩展,这就是上一步的重要性!
pecl install yar
pecl install msgpack
里面涉及一些docker镜像的载入导出https://www.jb51.net/article/151949.htm
和容器的载入导出https://www.cnblogs.com/linjiqin/p/8608975.html
导入保存的镜像包文件 cat server.tar | docker import - guofu/server:v1.0
启动容器 sudo docker run -idt --name webserver2 -p 8881:80 -v /home/wwwroot/default/test/:/home/wwwroot/default/test/ 0fb4edb46ffc /bin/bash
- 简单使用
- 原生server端
class API {
/**
* the doc info will be generated automatically into service info page.
* @params
* @return
*/
public function foo1($parameter) {
return 'this parameter is'.$parameter;
}
public function foo2() {
return 'this is no parameter';
}
$service = new Yar_Server(new API());
$service->handle();// 启动HTTP RPC Server
- 原生 Client端
$client = new Yar_Client("http://192.168.1.131:8888"); // 如果有命名空间,则采用$client = new Yar_Client("http://host/api/");
/* the following setopt is optinal */
//Set timeout to 1s
$client->SetOpt(YAR_OPT_CONNECT_TIMEOUT, 1000);
//Set packager to JSON
$client->SetOpt(YAR_OPT_PACKAGER, "json");
/* call remote service */
$result = $client->foo2("parameter");
- 并行调用
- 串行程序:
只能被顺序执行的指令列表 - 并行程序
可以在并行的硬件上执行的并发程序。
- 串行程序:
//服务端代码
<?php
class Operator
{
/**
* Add two operands
* @param interge
* @return interge
*/
public function add($a, $b)
{
sleep(3);
return $this->_add($a, $b);
}
/**
* Sub
*/
public function sub($a, $b)
{
sleep(3);
return $a - $b;
}
/**
* Mul
*/
public function mul($a, $b)
{
sleep(3);
return $a * $b;
}
/**
* Protected methods will not be exposed
* @param interge
* @return interge
*/
protected function _add($a, $b)
{
return $a + $b;
}
}
$server = new Yar_Server(new Operator());
$server->handle();
//客户端代码
<?php
function callback($retval, $callinfo) {
echo "这是一个远程调用的返回, 调用的服务名是", $callinfo["method"],
". 调用的sequence是 " , $callinfo["sequence"] , "\n";
var_dump($retval);
}
function add($ret, $callinfo) {
echo "add";
//sequence 序列 //uri 调用的rpc // method //调用的方法 //$ret 返回的结果
echo $callinfo['method'] , " result: ", $ret , "\n";
}
function sub($ret, $callinfo) {
echo "sub";
//sequence 序列 //uri 调用的rpc // method //调用的方法 //$ret 返回的结果
echo $callinfo['method'] , " result: ", $ret , "\n";
}
function mul($ret, $callinfo) {
echo "mul";
//sequence 序列 //uri 调用的rpc // method //调用的方法 //$ret 返回的结果
echo $callinfo['method'] , " result: ", $ret , "\n";
}
// 注册一个异步调用 // 调用地址----api名称----参数--回调函数的名字
//并行的(异步的)远程服务调用, 不过这个调用请求不会被立即发出, 调用loop的时候才真正发送出去
/*
* uri
RPC 服务的 URI(http 或 tcp).
method
调用的服务名字(也就是服务方法名).
parameters
调用的参数.
callback
回调函数, 在远程服务的返回到达的时候被Yar调用, 从而可以处理返回内容.
*
* */
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "add", array(1, 2),"add");
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "sub", array(2, 1), "sub");
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "mul", array(2, 2), "mul");
// 发送所有注册的调用,等待返回,返回后 Yar 会调用 callback 回掉函数 //
//public static Yar_Concurrent_Client::loop ([ callable $callback [, callable $error_callback ]] )
Yar_Concurrent_Client::loop();
// 重置 call , 否则上面的 call 会调用 //
/*
Yar_Concurrent_Client::reset();
Yar_Concurrent_Client::call("http://127.0.0.1:8881/test/counter.php", "sub", array(6, 7), "callback");
Yar_Concurrent_Client::loop();
*/
- 异常处理
//Server.php
<?php
class Custom_Exception extends Exception {};
class API {
public function throw_exception($name) {
throw new Custom_Exception($name);
}
}
$service = new Yar_Server(new API());
$service->handle();
?>
//Client.php
<?php
$client = new Yar_Client("http://host/api.php");
try {
$client->throw_exception("client");
} catch (Yar_Server_Exception $e) {
var_dump($e->getType());
var_dump($e->getMessage());
}
- 在laravel中的使用
- 原理
Yar 远程调用的实现原理
实际呢,yar client 是通过__call这个魔术方法来实现远程调用的,在Yar_client类里面并没有任何方法,当我们在调用一个不存在的方式的时候,就会执行__call方法,这个在框架中非常常见。
Client需要远程调用的时候,先初始化数据,数据主要包括三个部分header, packager_name,request_body,然后根据配置中的方式从pack_list选择合适的序列化方式(msgpack,json,php),对request_body进行序列化。 然后进行传输,传输方式 同样是采用工厂方法,从已有的方式中选择Curl/socket方式进行传输数据,传输数据的过程实际就是发送一个http请求。
服务器端监听端口,底层网络实现都利用现有的nginx, php-fpm。在PHP代码中,实现的地方实例化一个类,然后根据request的内容,解析body,然后去初始化header, 获得packager_name,根据packer_name解析yar_body的内容。
-
Yar 协议分析
以下是每次request和response的数据情况。以request为例传输的body数据包含三个部分,82byte的头部,packager_name,和经过packager_name序列化的string。当这段数据传到server端,按照固定的结构解析出来即可。
image.png
在 yar 中规定的传输协议如下图所示,请求体为82个字节的yar_header_t
和8字节的打包名称和请求实体yar_request_t
,在yar_header_t
里面用body_len
记录8字节的打包名称+请求实体的长度;返回体类似,只是实体内容的结构体稍微不同,在reval
里面才是实际最后客户端需要的结果。
整个传输以二进制流的形式传送。
传输
yar目前有两种传输方式,这个在之前提到过curl和socket方式。因为socket实际没有用过所以这里主要介绍curl。
curl的模块主要以来底层的CURL模块,主要封装了如下方法。其中multi系列主要是解决并行请求的方案。
php_yar_curl_open
这个方法主要是用CURL创建一个连接对象。这里有个复用的机制,当options &
YAR_PROTOCOL_PERSISTENT(0x1) 为true的时候,会去已经使用的连接中找一下,如果存在并且没有在用则复用。
php_yar_curl_send
这个函数主要是把request需要的数据,转成字符串,然后放在请求的postfield里面。
php_yar_curl_exec
将postfield内容,通过http请求发送出去。获得返回的内容。然后按照上述内容解析出结果。
RPC 采用客户端/服务器模式。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
https://segmentfault.com/a/1190000010158190
https://blog.51cto.com/9681602/2287566
附录:打包器类型讲解:
如前问所述,packager目前的实现有3种:
msgpack
php
json
// packager\php.c
int php_yar_packager_php_pack(const yar_packager_t *self, zval *pzval, smart_str *buf, char **msg) /* {{{ */ {
php_serialize_data_t var_hash;
PHP_VAR_SERIALIZE_INIT(var_hash);
php_var_serialize(buf, pzval, &var_hash);
PHP_VAR_SERIALIZE_DESTROY(var_hash);
return 1;
} /* }}} */
zval * php_yar_packager_php_unpack(const yar_packager_t *self, char *content, size_t len, char **msg, zval *rret) /* {{{ */ {
zval *return_value;
const unsigned char *p;
php_unserialize_data_t var_hash;
p = (const unsigned char*)content;
PHP_VAR_UNSERIALIZE_INIT(var_hash);
if (!php_var_unserialize(rret, &p, p + len, &var_hash)) {
zval_ptr_dtor(rret);
PHP_VAR_UNSERIALIZE_DESTROY(var_hash);
spprintf(msg, 0, "unpack error at offset %ld of %ld bytes", (long)((char*)p - content), len);
return NULL;
}
PHP_VAR_UNSERIALIZE_DESTROY(var_hash);
return_value = rret;
return return_value;
} /* }}} */
该方式实际上就是就是我们平常使用的serialize(),unserialize函数。
json
//packager\json.c
int php_yar_packager_json_pack(const yar_packager_t *self, zval *pzval, smart_str *buf, char **msg) /* {{{ */ {
#if ((PHP_MAJOR_VERSION == 5) && (PHP_MINOR_VERSION < 3))
php_json_encode(buf, pzval);
#else
php_json_encode(buf, pzval, 0); /* options */
#endif
return 1;
} /* }}} */
zval * php_yar_packager_json_unpack(const yar_packager_t *self, char *content, size_t len, char **msg, zval *rret) /* {{{ */ {
zval *return_value;
php_json_decode(rret, content, len, 1, 512);
return_value = rret;
return return_value;
} /* }}} */
json方式使用json拓展的json_encode(),json_decode()对变量进行序列化操作
msgpack
//packager\msgpack.c
int php_yar_packager_msgpack_pack(const yar_packager_t *self, zval *pzval, smart_str *buf, char **msg) /* {{{ */ {
php_msgpack_serialize(buf, pzval);
return 1;
} /* }}} */
zval * php_yar_packager_msgpack_unpack(const yar_packager_t *self, char *content, size_t len, char **msg, zval *rret) /* {{{ */ {
zval *return_value;
ZVAL_NULL(rret);
php_msgpack_unserialize(rret, content, len);
return_value = rret;
return return_value;
} /* }}} */
而yar_packager_msgpack使用msgpack拓展的php_msgpack_serialize()和php_msgpack_unserialize()函数对变量进行序列化。3种序列化方式中该种传输效率最高,空间使用最小, 使用该方式需要自行安装msgpack-php拓展并在编译yar时候使用--enable-msgpack参数。