几个文件
-
php_streams.h
定义了php数据流的结构与相关函数 -
php_streams.c
定义了php流的相关函数的具体实现 -
xp_socket.c
定义了socket连接相关的具体函数(封装了network.c的函数,并实现了读写方法) -
php_stream_transport.h
socket和stream流转换函数定义头文件 -
transports.c
socket和stream流转换函数具体实现 -
php_network.h
network.c的头文件 -
network.c
最终执行c的网络编程库netinet/tcp.h的函数去创建、绑定、连接(没封装读写)
php
首先我们先来看看php代码,通常我们连接redis时,会先创建一个Redis对象,然后调用他的connect方法连接redis
// 创建redis对象
$redis = new Redis();
// 连接redis
$result = $redis->connect($host, $port, $timeout);
那么这两句代码c语言是怎么实现的呢
c
- 类的定义
我们先找到redis扩展是如何定义redis类的
// file: redis.c
/* Redis class */
// INIT_CLASS_ENTRY是zend定义的宏方法,他用来告诉虚拟机如何解释Redis这个类
/**
* 入参
* 1、是zend定义的zend_class_entry结构体,用来存放类的结构,这里只要声明该类型的变量传进去就可以,
INIT_CLASS_ENTRY方法将为它分配内存空间并将类名及方法注册到内存中
* 2、是类名
* 3、是类拥有的方法,这里通过redis_get_methods方法返回所有方法的定义
*/
INIT_CLASS_ENTRY(redis_class_entry, "Redis", redis_get_methods());
// 将类的结构注册到zend中
redis_ce = zend_register_internal_class(&redis_class_entry);
// 设置类的创建方法,用new关键字创建类时将执行该方法,这里的方法是create_redis_object
redis_ce->create_object = create_redis_object;
部分方法定义
// file: redis_legacy_arginfo.h
ZEND_METHOD(Redis, __construct); // 为类注册__construct函数(zend会自动执行,redis.c第612行)
ZEND_METHOD(Redis, connect); // 为类注册connect函数
-
new Redis()
接下来我们看看create_redis_object干了什么
// file: redis.c
zend_object *
create_redis_object(zend_class_entry *ce)
{
// 为分配一块内存给redis变量,该变量为redis_object类型
redis_object *redis = ecalloc(1, sizeof(redis_object) + zend_object_properties_size(ce));
// redis的socket初始化为空
redis->sock = NULL;
/**
* zend_object_std_init方法用来根据类的结构创建zend_object,然后赋值给redis->std,因为redis是内存指
* 针,所以用&符号指定内存位置
*/
zend_object_std_init(&redis->std, ce);
// 初始化zend_object对象的属性
object_properties_init(&redis->std, ce);
/**
* 将zend的object_handler指针复制一份到redis_object_handlers,意思是redis_object_handlers也指向了
* object_handler
*/
memcpy(&redis_object_handlers, zend_get_std_object_handlers(), sizeof(redis_object_handlers));
// 在object_handler中找到管理redis对象的位置,并让操作指针指向该位置
redis_object_handlers.offset = XtOffsetOf(redis_object, std);
// 在该位置设置对象的销毁方法
redis_object_handlers.free_obj = free_redis_object;
redis->std.handlers = &redis_object_handlers;
// 返回redis对象
return &redis->std;
}
下面是redis_object的结构体
// file: common.h
typedef struct {
RedisSock *sock;
zend_object std;
} redis_object;
$redis->connect($host, $port, $timeout);
redis.c
// file:redis.c
/* {{{ proto boolean Redis::connect(string host, int port [, double timeout [, long retry_interval]])
*/
PHP_METHOD(Redis, connect)
{
if (redis_connect(INTERNAL_FUNCTION_PARAM_PASSTHRU, 0) == FAILURE) {
RETURN_FALSE;
} else {
RETURN_TRUE;
}
}
PHP_REDIS_API int
redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent)
{
zval *object, *context = NULL, *ele;
char *host = NULL, *persistent_id = NULL;
zend_long port = -1, retry_interval = 0;
size_t host_len, persistent_id_len;
double timeout = 0.0, read_timeout = 0.0;
redis_object *redis;
// 线程安全模式下,不允许持久连接
#ifdef ZTS
/* not sure how in threaded mode this works so disabled persistence at
* first */
persistent = 0;
#endif
if (zend_parse_method_parameters(ZEND_NUM_ARGS(), getThis(),
"Os|lds!lda", &object, redis_ce, &host,
&host_len, &port, &timeout, &persistent_id,
&persistent_id_len, &retry_interval,
&read_timeout, &context) == FAILURE)
{
return FAILURE;
}
// 下面都是参数校验
// ……省略了
redis = PHPREDIS_ZVAL_GET_OBJECT(redis_object, object);
// 判断如果有链接了,先断开,会将redis->sock的status属性设置为:REDIS_SOCK_STATUS_DISCONNECTED
/* if there is a redis sock already we have to remove it */
if (redis->sock) {
redis_sock_disconnect(redis->sock, 0);
redis_free_socket(redis->sock);
}
// 调用redis_sock_create方法获取redis_socket(只是获取,没连接)
redis->sock = redis_sock_create(host, host_len, port, timeout, read_timeout, persistent,
persistent_id, retry_interval);
// ……又是一些设置,省略了
// 调用redis_sock_server_open进行连接
if (redis_sock_server_open(redis->sock) < 0) {
// 小于0,连接失败,如果有错误,调用函数抛异常
if (redis->sock->err) {
REDIS_THROW_EXCEPTION(ZSTR_VAL(redis->sock->err), 0);
}
// 释放内存
redis_free_socket(redis->sock);
redis->sock = NULL;
// 返回失败
return FAILURE;
}
// 返回成功
return SUCCESS;
}
library.c
// file:library.c
/**
* redis_sock_server_open
*/
PHP_REDIS_API int
redis_sock_server_open(RedisSock *redis_sock)
{
if (redis_sock) {
switch (redis_sock->status) {
case REDIS_SOCK_STATUS_DISCONNECTED:
// 当前状态是未连接,走的是这里,调用redis_sock_connect方法连接
if (redis_sock_connect(redis_sock) != SUCCESS) {
break;
} else if (redis_sock->status == REDIS_SOCK_STATUS_READY) {
return SUCCESS;
}
// fall through
case REDIS_SOCK_STATUS_CONNECTED:
if (redis_sock_auth(redis_sock) != SUCCESS)
break;
redis_sock->status = REDIS_SOCK_STATUS_READY;
// fall through
case REDIS_SOCK_STATUS_READY:
return SUCCESS;
default:
return FAILURE;
}
}
return FAILURE;
}
/**
* redis_sock_connect
*/
PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock)
{
// ……前面是一堆参数初始化以及校验,省略了
#ifdef HAVE_IPV6
/* If we've got IPv6 and find a colon in our address, convert to proper
* IPv6 [host]:port format */
if (strchr(address, ':') != NULL) {
fmtstr = "%s://[%s]:%d";
}
#endif
host_len = snprintf(host, sizeof(host), fmtstr, scheme, address, redis_sock->port);
}
if (scheme_free) efree(scheme);
// 如果是持久化连接
if (redis_sock->persistent) {
// 是否启用连接池
if (INI_INT("redis.pconnect.pooling_enabled")) {
// 从连接池获取连接(先不管这种模式,NTS模式下才有ZTS没有)
p = redis_sock_get_connection_pool(redis_sock);
// ……后面先省略了
} else {
// 没启用连接池
// ……也先省略了
}
}
// ……又有一些参数组装
// 重要的是调用了php_stream_xport_create去获取用来操作的流
redis_sock->stream = php_stream_xport_create(host, host_len,
0, STREAM_XPORT_CLIENT | STREAM_XPORT_CONNECT,
persistent_id ? ZSTR_VAL(persistent_id) : NULL,
tv_ptr, redis_sock->stream_ctx, &estr, &err);
// ……这里不重要了,省略
// 连接成功后将状态改为REDIS_SOCK_STATUS_CONNECTED(已连接)
redis_sock->status = REDIS_SOCK_STATUS_CONNECTED;
return SUCCESS;
}
transports.c
PHPAPI php_stream *_php_stream_xport_create(const char *name, size_t namelen, int options,
int flags, const char *persistent_id,
struct timeval *timeout,
php_stream_context *context,
zend_string **error_string,
int *error_code
STREAMS_DC)
{
// ……一堆参数初始化及校验
/* check for a cached persistent socket */
if (persistent_id) {
// ……NTS模式的,先省略
}
// 获取协议头,这里没有,所以默认是tcp
if ((*p == ':') && (n > 1) && !strncmp("://", p, 3)) {
protocol = name;
name = p + 3;
namelen -= n + 3;
} else {
protocol = "tcp";
n = 3;
}
if (protocol) {
// 获取工厂方法,这里因为是tcp模式,获取的是tcp的工厂方法
if (NULL == (factory = zend_hash_str_find_ptr(&xport_hash, protocol, n))) {
char wrapper_name[32];
if (n >= sizeof(wrapper_name))
n = sizeof(wrapper_name) - 1;
PHP_STRLCPY(wrapper_name, protocol, sizeof(wrapper_name), n);
ERR_REPORT(error_string, "Unable to find the socket transport \"%s\" - did you forget to enable it when you configured PHP?",
wrapper_name);
return NULL;
}
}
if (factory == NULL) {
/* should never happen */
php_error_docref(NULL, E_WARNING, "Could not find a factory !?");
return NULL;
}
//调用工厂方法创建stream,默认是tcp的工厂(只是创建了stream,没有连接)
stream = (factory)(protocol, n,
(char*)name, namelen, persistent_id, options, flags, timeout,
context STREAMS_REL_CC);
if (stream) {
php_stream_context_set(stream, context);
// 位运算,表示不是服务器端
if ((flags & STREAM_XPORT_SERVER) == 0) {
/* client */
// 判断flags是否要进行连接或者是异步连接
// 是则调用php_stream_xport_connect进行连接,返回-1就是连接失败,报错
if (flags & (STREAM_XPORT_CONNECT|STREAM_XPORT_CONNECT_ASYNC)) {
if (-1 == php_stream_xport_connect(stream, name, namelen,
flags & STREAM_XPORT_CONNECT_ASYNC ? 1 : 0,
timeout, &error_text, error_code)) {
ERR_RETURN(error_string, error_text, "connect() failed: %s");
failed = 1;
}
}
} else {
/* server */
// ……服务器端的,先省略
}
}
if (failed) {
/* failure means that they don't get a stream to play with */
if (persistent_id) {
php_stream_pclose(stream);
} else {
php_stream_close(stream);
}
stream = NULL;
}
return stream;
}
/* Connect to a remote address */
PHPAPI int php_stream_xport_connect(php_stream *stream,
const char *name, size_t namelen,
int asynchronous,
struct timeval *timeout,
zend_string **error_text,
int *error_code
)
{
// ……参数设置与校验,省略
// 调用php_stream_set_option,该方法进行了连接
ret = php_stream_set_option(stream, PHP_STREAM_OPTION_XPORT_API, 0, ¶m);
if (ret == PHP_STREAM_OPTION_RETURN_OK) {
if (error_text) {
*error_text = param.outputs.error_text;
}
if (error_code) {
*error_code = param.outputs.error_code;
}
return param.outputs.returncode;
}
return ret;
}
streams.c
// file:streams.c
// 设置流,实际上调用了socket的操作(如:php_tcp_sockop_set_option,根据连接协议的不同而不同)
PHPAPI int _php_stream_set_option(php_stream *stream, int option, int value, void *ptrparam)
{
int ret = PHP_STREAM_OPTION_RETURN_NOTIMPL;
if (stream->ops->set_option) {
/**
* 调用set_option方法,根据前面工厂生成的stream不同而不同
* 由于是tcp工厂生成的,这里调用的是php_tcp_sockop_set_option
*/
ret = stream->ops->set_option(stream, option, value, ptrparam);
}
// ……后面不重要了,省略
return ret;
}
xp_socket.c
//file: xp_socket.c
// 连接socket的关键
static int php_tcp_sockop_set_option(php_stream *stream, int option, int value, void *ptrparam)
{
// 获得socket套接字的内存地址
php_netstream_data_t *sock = (php_netstream_data_t*)stream->abstract;
php_stream_xport_param *xparam;
switch(option) {
case PHP_STREAM_OPTION_XPORT_API:
xparam = (php_stream_xport_param *)ptrparam;
switch(xparam->op) {
// 如果option是连接,就调用php_tcp_sockop_connect(stream, sock, xparam);进行连接
case STREAM_XPORT_OP_CONNECT: // 同步阻塞连接
case STREAM_XPORT_OP_CONNECT_ASYNC: // 异步连接
xparam->outputs.returncode = php_tcp_sockop_connect(stream, sock, xparam);
return PHP_STREAM_OPTION_RETURN_OK;
// 如果option是绑定,php_tcp_sockop_bind(stream, sock, xparam);进行端口绑定
case STREAM_XPORT_OP_BIND:
xparam->outputs.returncode = php_tcp_sockop_bind(stream, sock, xparam);
return PHP_STREAM_OPTION_RETURN_OK;
// 如果option是接收链接(服务端用),php_tcp_sockop_accept(stream, sock, xparam);进行阻塞等待连接进入
case STREAM_XPORT_OP_ACCEPT:
xparam->outputs.returncode = php_tcp_sockop_accept(stream, sock, xparam STREAMS_CC);
return PHP_STREAM_OPTION_RETURN_OK;
default:
/* fall through */
;
}
}
return php_sockop_set_option(stream, option, value, ptrparam);
}
static inline int php_tcp_sockop_connect(php_stream *stream, php_netstream_data_t *sock,
php_stream_xport_param *xparam)
{
// ……初始化了写参数,省略
#ifdef AF_UNIX
// 如果是本地套接字模式(入参host用localhost的场景)
if (stream->ops == &php_stream_unix_socket_ops || stream->ops == &php_stream_unixdg_socket_ops) {
struct sockaddr_un unix_addr;
// 调用socket方法创建获取套接字的文件描述符(int)
sock->socket = socket(PF_UNIX, stream->ops == &php_stream_unix_socket_ops ? SOCK_STREAM : SOCK_DGRAM, 0);
if (sock->socket == SOCK_ERR) {
if (xparam->want_errortext) {
xparam->outputs.error_text = strpprintf(0, "Failed to create unix socket");
}
return -1;
}
parse_unix_address(xparam, &unix_addr);
// 调用php_network_connect_socket进行连接(connect)
ret = php_network_connect_socket(sock->socket,
(const struct sockaddr *)&unix_addr, (socklen_t) XtOffsetOf(struct sockaddr_un, sun_path) + xparam->inputs.namelen,
xparam->op == STREAM_XPORT_OP_CONNECT_ASYNC, xparam->inputs.timeout,
xparam->want_errortext ? &xparam->outputs.error_text : NULL,
&err);
xparam->outputs.error_code = err;
// 连接失败直接跳到该方法后面的out位置
goto out;
}
#endif
// …… 又是一堆参数初始化及非空校验,省略
/* Note: the test here for php_stream_udp_socket_ops is important, because we
* want the default to be TCP sockets so that the openssl extension can
* re-use this code. */
// 非本地套接字模式(tcp协议)
sock->socket = php_network_connect_socket_to_host(host, portno,
stream->ops == &php_stream_udp_socket_ops ? SOCK_DGRAM : SOCK_STREAM,
xparam->op == STREAM_XPORT_OP_CONNECT_ASYNC,
xparam->inputs.timeout,
xparam->want_errortext ? &xparam->outputs.error_text : NULL,
&err,
bindto,
bindport,
sockopts
);
// 后面不重要了
ret = sock->socket == -1 ? -1 : 0;
xparam->outputs.error_code = err;
if (host) {
efree(host);
}
if (bindto) {
efree(bindto);
}
#ifdef AF_UNIX
out:
#endif
if (ret >= 0 && xparam->op == STREAM_XPORT_OP_CONNECT_ASYNC && err == EINPROGRESS) {
/* indicates pending connection */
return 1;
}
return ret;
}
network.c
// file:network.c
/* {{{ php_network_connect_socket_to_host */
php_socket_t php_network_connect_socket_to_host(const char *host, unsigned short port,
int socktype, int asynchronous, struct timeval *timeout, zend_string **error_string,
int *error_code, const char *bindto, unsigned short bindport, long sockopts
)
{
// …… 前面暂时不重要
// 这里再循环啥暂时没看懂
for (sal = psal; !fatal && *sal != NULL; sal++) {
sa = *sal;
switch (sa->sa_family) {
#if HAVE_GETADDRINFO && HAVE_IPV6
case AF_INET6: // ipv6
if (!bindto || strchr(bindto, ':')) {
((struct sockaddr_in6 *)sa)->sin6_port = htons(port);
socklen = sizeof(struct sockaddr_in6);
} else {
/* Expect IPV4 address, skip to the next */
continue;
}
break;
#endif
case AF_INET: // ipv4
((struct sockaddr_in *)sa)->sin_port = htons(port);
socklen = sizeof(struct sockaddr_in);
if (bindto && strchr(bindto, ':')) {
/* IPV4 sock cannot bind to IPV6 address */
bindto = NULL;
}
break;
default:
/* Unsupported family, skip to the next */
continue;
}
// 创建套接字文件描述符
/* create a socket for this address */
sock = socket(sa->sa_family, socktype, 0);
if (sock == SOCK_ERR) {
continue;
}
/* make a connection attempt */
if (bindto) {
// ……一堆处理ip地址的操作
// 绑定地址(bind方法)
// ip地址为空,报warnning,否则调用bind
if (local_address_len == 0) {
php_error_docref(NULL, E_WARNING, "Invalid IP Address: %s", bindto);
} else if (bind(sock, &local_address.common, local_address_len)) {
php_error_docref(NULL, E_WARNING, "Failed to bind to '%s:%d', system said: %s", bindto, bindport, strerror(errno));
}
}
// ……中间这里有一堆先不管了,省略
// 最后还是调用php_network_connect_socket进行连接
n = php_network_connect_socket(sock, sa, socklen, asynchronous,
timeout ? &working_timeout : NULL,
error_string, error_code);
if (n != -1) {
goto connected; // 连接成功跳转到connected位置
}
// ……中间省略一堆
connected:
// 释放存放IP地址占用的内存
php_network_freeaddresses(psal);
return sock;
}
/* }}} */
/* {{{ php_network_connect_socket */
PHPAPI int php_network_connect_socket(php_socket_t sockfd,
const struct sockaddr *addr,
socklen_t addrlen,
int asynchronous,
struct timeval *timeout,
zend_string **error_string,
int *error_code)
{
php_non_blocking_flags_t orig_flags;
int n;
int error = 0;
socklen_t len;
int ret = 0;
SET_SOCKET_BLOCKING_MODE(sockfd, orig_flags);
// 调用connect方法
if ((n = connect(sockfd, addr, addrlen)) != 0) {
error = php_socket_errno();
if (error_code) {
*error_code = error;
}
if (error != EINPROGRESS) {
if (error_string) {
*error_string = php_socket_error_str(error);
}
return -1;
}
if (asynchronous && error == EINPROGRESS) {
/* this is fine by us */
return 0;
}
}
if (n == 0) {
goto ok;
}
// ……中间一堆先省略
ok:
if (!asynchronous) {
/* back to blocking mode */
RESTORE_SOCKET_BLOCKING_MODE(sockfd, orig_flags);
}
if (error_code) {
*error_code = error;
}
if (error) {
ret = -1;
if (error_string) {
*error_string = php_socket_error_str(error);
}
}
return ret;
}
/* }}} */