[原]PHP-yar拓展源码解读六-transport篇

传输器结构

yar底层用一个_yar_transport_interface结构表示一个传输器,处理网络IO相关事宜。

typedef struct _yar_transport_interface {
    void *data;//这个变量在并发调用章节才会讲到,暂时不用管
    int  (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
    int  (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
    struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
    int  (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
    int  (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
    void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;

yar_transport_interface_tyar_transport_t生成

typedef struct _yar_transport {
    const char *name;
    struct _yar_transport_interface * (*init)();
    void (*destroy)(yar_transport_interface_t *self);
    yar_transport_multi_t *multi;
} yar_transport_t;

两者关系如图。


传输器类图.png

前面提过Yar是典型的OO风格的C代码。虽然说是C语言的实现,但要理解这里几个相关结构必须要用面向对象的眼光。

yar_transport_t结构体类型相当于一个工厂接口,声明了一个工厂骨架。
yar_transport_t类型的两个变量相当于 两个 工厂接口的实现对象,不同的变量使用不同的函数指针形成了多态。yar_transport_t管理着yar_transport_interface结构体变量的构造和析构,产生具体的yar_transport_interface类型的实例,每个yar_transport_interface实例都有自己的成员变量。

还有一种相似的理解思路。
yar_transport_t是一个抽象类。
yar_transport_t类型的两个变量是该抽象类的两个实现子类,yar_transport_t 声明的方法都是类方法和类变量,对应PHP中的 __construct()__destruct()
yar_transport_interface则是yar_transport_t这个抽象类的实例,其结构体上的成员对应过的是实例的成员变量和成员方法。

但无论是哪一种,理解的核心都是以下两点:
1.yar_transport_interface是一个传输器对象,有自己的独立数据和状态。
2.yar_transport_t类型的变量和yar_transport_t类型构成了多态,用于生成不同类型的yar_transport_interface对象。
后文统一使用第一种方式去描述yar的传输器模块。

传输器工厂类

yar_transport_t->name表示传输器工厂的名字,下文会用传输器工厂的名字借代该工厂构造的传输器。
Yar目前支持HttpTcpUnixSock(仅单机内调用可用)三种传输方式/协议,Http目前的传输器使用Curl实现,传输器(工厂)的名字为curl,另外两种两种使用socket实现,传输器名字为sock,不过由于PHP目前没有Tcp/UnixSock的Yar Rpc Server,所以这个sock传输器很少会用到。
init()destroy()是函数指针,用于构造和销毁 yar_transport_interface_t实例。
multi()是另外一个工厂,用于构造一个并发调度器实例,后续并发调用章节我们也会提到。

传输器类

传输器有sockcurl两种,前者应用场景较少。
但是考虑其比较简单,没有并发调用模式,这里将以其为示例先讲述传输器类中同步RPC调用模式的相关方法 ,至于curl传输器以及 并发RPC调用 在后面会用另外一篇文章单独分析。实际上两种传输器的接口方法语义和调用方式都有一定差异。

Open()

//socket.c
int php_yar_socket_open(yar_transport_interface_t *self, zend_string *address, long options, char **err_msg) /* {{{ */ {
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;
    struct timeval tv;
    php_stream *stream = NULL;
    zend_string *errstr = NULL;
    char *persistent_key = NULL;
    int err;

    tv.tv_sec = (ulong)(YAR_G(connect_timeout) / 1000);
    tv.tv_usec = (ulong)((YAR_G(connect_timeout) % 1000)? (YAR_G(connect_timeout) % 1000) * 1000 : 0);
     //直到我看到这块源码,我才知道原来Yar是支持长连接的,长连接开关的来源为ini文件中的`yar.allow_persistent`,这个配置在php.net上也是找不到的,curl传输器同样支持该配置。
    if (options & YAR_PROTOCOL_PERSISTENT) {
        data->persistent = 1;
        spprintf(&persistent_key, 0, "yar_%s", ZSTR_VAL(address));
    } else {
        data->persistent = 0;
    }

    //发起连接或根据uri获取可复用的旧连接
    stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);

    if (persistent_key) {
        efree(persistent_key);
    }

    if (stream == NULL) {
        spprintf(err_msg, 0, "Unable to connect to %s (%s)", ZSTR_VAL(address), strerror(errno));
        efree(errstr);
        return 0;
    }
    //sock传输器是同步阻塞的,不支持并行调用
    php_stream_set_option(stream, PHP_STREAM_OPTION_BLOCKING, 0, NULL);

#if ZEND_DEBUG
    stream->__exposed++;
#endif

    data->stream = stream;
        
    return 1;
} /* }}} */

yar_transport_interface->open()用于发起连接或初始化网络IO相关的资源。
虽然这个socket传输器的名字叫'sock',但是socket方式也没有直接使用socket()系函数,而是使用了PHP提供的 机制封装的统一接口,简化相关通信细节。

Send()

//socket.c
int php_yar_socket_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) /* {{{ */ {
    fd_set rfds;
    zend_string *payload;
    struct timeval tv;
    int ret = -1, fd, retval;
    char buf[SEND_BUF_SIZE];
    yar_header_t header = {0};
    //self变量相当于python方法的self参数,也类似于手动传递一个PHP 的$this指针供方法使用,面向对象风格的C常见做法。
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;

    FD_ZERO(&rfds);
    //php的流转文件描述符
    if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
        PHP_SAFE_FD_SET(fd, &rfds);
    } else {
        spprintf(msg, 0, "Unable cast socket fd form stream (%s)", strerror(errno));
        return 0;
    }

    //request变量序列化
    if (!(payload = php_yar_request_pack(request, msg))) {
        return 0;
    }

    DEBUG_C(ZEND_ULONG_FMT": pack request by '%.*s', result len '%ld', content: '%.32s'", 
            request->id, 7, ZSTR_VAL(payload), ZSTR_LEN(payload), ZSTR_VAL(payload) + 8);

    //构建header变量
    /* for tcp/unix RPC, we need another way to supports auth */
    php_yar_protocol_render(&header, request->id, "Yar PHP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);

    memcpy(buf, (char *)&header, sizeof(yar_header_t));

    tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
    tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);

    //php_select()是select()的别名宏,等待socket文件描述符可写
    retval = php_select(fd+1, NULL, &rfds, NULL, &tv);

    if (retval == -1) {
        zend_string_release(payload);
        spprintf(msg, 0, "select error '%s'", strerror(errno));
        return 0;
    } else if (retval == 0) {
        zend_string_release(payload);
        spprintf(msg, 0, "select timeout '%ld' seconds reached", YAR_G(timeout));
        return 0;
    }

    //通过buff缓冲区循环发送所有报文(协议头+载荷)
    if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
        size_t bytes_left = 0, bytes_sent = 0;

        if (ZSTR_LEN(payload) > (sizeof(buf) - sizeof(yar_header_t))) {
            memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), sizeof(buf) - sizeof(yar_header_t));
            if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(buf), 0, NULL, 0)) < 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "unable to send data");
                return 0;
            }
        } else {
            memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), ZSTR_LEN(payload));
            if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(yar_header_t) + ZSTR_LEN(payload), 0, NULL, 0)) < 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "unable to send data");
                return 0;
            }
        }

        bytes_sent = ret - sizeof(yar_header_t);
        bytes_left = ZSTR_LEN(payload) - bytes_sent;

wait_io:
        if (bytes_left) {
            retval = php_select(fd+1, NULL, &rfds, NULL, &tv);

            if (retval == -1) {
                zend_string_release(payload);
                spprintf(msg, 0, "select error '%s'", strerror(errno));
                return 0;
            } else if (retval == 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
                return 0;
            }

            if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
                if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
                    bytes_left -= ret;
                    bytes_sent += ret;
                }
            }
            goto wait_io;
        }
    }

    zend_string_release(payload);

    return ret < 0? 0 : 1;
} /* }}} */

yar_transport_interface->send()接受一个request参数,用于实际的IO发包。
备注:对于curl传输器,由于libcurl的api限制因为其send()并未进行实际的发包,实际上发包收包都在exec()中

Exec()

//socket.c
yar_response_t * php_yar_socket_exec(yar_transport_interface_t* self, yar_request_t *request) /* {{{ */ {
    fd_set rfds;
    struct timeval tv;
    yar_header_t *header;
    yar_response_t *response;
    int fd, retval, recvd;
    size_t len = 0, total_recvd = 0;
    char *msg, buf[RECV_BUF_SIZE], *payload = NULL;
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;

    response = ecalloc(1, sizeof(yar_response_t));

    //php stram转socket fd
    FD_ZERO(&rfds);
    if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
        PHP_SAFE_FD_SET(fd, &rfds);
    } else {
        len = snprintf(buf, sizeof(buf), "Unable cast socket fd form stream (%s)", strerror(errno));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
        return response;
    }

    tv.tv_sec = (ulong)(YAR_G(timeout) / 1000);
    tv.tv_usec = (ulong)((YAR_G(timeout) % 1000)? (YAR_G(timeout) & 1000) * 1000 : 0);

//根据协议头长度循环收包
wait_io:
    retval = php_select(fd+1, &rfds, NULL, NULL, &tv);

    if (retval == -1) {
        len = snprintf(buf, sizeof(buf), "Unable to select %d '%s'", fd, strerror(errno));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
        return response;
    } else if (retval == 0) {
        len = snprintf(buf, sizeof(buf), "select timeout %ldms reached", YAR_G(timeout));
        php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
        return response;
    }

    if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
        zval *retval, rret;
        //尚未解析出载荷信息/未解析出协议头
        if (!payload) {
            if ((recvd = php_stream_xport_recvfrom(data->stream, buf, sizeof(buf), 0, NULL, NULL, NULL)) > 0) {
                //解析出协议头
                if (!(header = php_yar_protocol_parse(buf))) {
                    php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
                    return response;
                }
                //根据协议头获取载荷长度并分配相应内存接受载荷数据
                payload = emalloc(header->body_len);
                len = header->body_len;
                total_recvd  = recvd - sizeof(yar_header_t);
                            
                memcpy(payload, buf + sizeof(yar_header_t), total_recvd);

                if (recvd < (sizeof(yar_header_t) + len)) {
                    goto wait_io;   
                }
            } else if (recvd < 0) {
                /* this should never happen */
                goto wait_io;
            }
        } else {
            if ((recvd = php_stream_xport_recvfrom(data->stream, payload + total_recvd, len - total_recvd, 0, NULL, NULL, NULL)) > 0) {
                total_recvd += recvd;
            }

            if (total_recvd < len) {
                goto wait_io;
            }
        }
        //此处已经收包结束
        if (len) {
            //调用相应打包器反序列化载荷数据成一个PHP数组变量到`rret`和`retval`中,具体参考response章节
            if (!(retval = php_yar_packager_unpack(payload, len, &msg, &rret))) {
                php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
                efree(msg);
                return response;
            }
            //isore数组变量转response结构
            php_yar_response_map_retval(response, retval);

            DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'",
                    response->id, 7, payload, header->body_len, payload + 8);

            efree(payload);
            zval_ptr_dtor(retval);
        } else {
            php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
        }
        return response;
    } else {
        goto wait_io;
    }
} /* }}} */

yar_transport_interface->exec()用于IO收包,并将返回的数据封装成response变量返回。

close()

//socket.c
void php_yar_socket_close(yar_transport_interface_t* self) /* {{{ */ {
    yar_socket_data_t *data = (yar_socket_data_t *)self->data;

    if (!data) {
        return;
    }
    //长连接模式不关闭连接
    if (!data->persistent && data->stream) {
        php_stream_close(data->stream);
    }

    efree(data);
    efree(self);

    return;
}
/* }}} */

yar_transport_interface->exec()->close()用于相关资源的回收和释放。

yar_transport_interface的其他接口方法

yar_transport_interface->exec()->setopt()用于相关配置项的配置,socket传输器没有实现并使用该接口,curl在exec()方法中用于设置超时信息。
yar_transport_interface->exec()->call_data()目前仅并发调用会用到,这个我们在并发调用章节再提及。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容