phpredis是如何链接redis服务器的

几个文件

  • php_streams.h 定义了php数据流的结构与相关函数
  • php_streams.c 定义了php流的相关函数的具体实现
  • xp_socket.c 定义了socket连接相关的具体函数(封装了network.c的函数,并实现了读写方法)
  • php_stream_transport.h socket和stream流转换函数定义头文件
  • transports.c socket和stream流转换函数具体实现
  • php_network.h network.c的头文件
  • network.c 最终执行c的网络编程库netinet/tcp.h的函数去创建、绑定、连接(没封装读写)

php

首先我们先来看看php代码,通常我们连接redis时,会先创建一个Redis对象,然后调用他的connect方法连接redis

// 创建redis对象
$redis = new Redis();
// 连接redis
$result = $redis->connect($host, $port, $timeout);

那么这两句代码c语言是怎么实现的呢

c

  • 类的定义
    我们先找到redis扩展是如何定义redis类的
// file: redis.c

/* Redis class */
// INIT_CLASS_ENTRY是zend定义的宏方法,他用来告诉虚拟机如何解释Redis这个类
/**
 * 入参
 * 1、是zend定义的zend_class_entry结构体,用来存放类的结构,这里只要声明该类型的变量传进去就可以,
INIT_CLASS_ENTRY方法将为它分配内存空间并将类名及方法注册到内存中
 * 2、是类名
 * 3、是类拥有的方法,这里通过redis_get_methods方法返回所有方法的定义
 */
INIT_CLASS_ENTRY(redis_class_entry, "Redis", redis_get_methods());
// 将类的结构注册到zend中
redis_ce = zend_register_internal_class(&redis_class_entry);
// 设置类的创建方法,用new关键字创建类时将执行该方法,这里的方法是create_redis_object
redis_ce->create_object = create_redis_object;

部分方法定义

// file: redis_legacy_arginfo.h
ZEND_METHOD(Redis, __construct); // 为类注册__construct函数(zend会自动执行,redis.c第612行)
ZEND_METHOD(Redis, connect); // 为类注册connect函数
  • new Redis()
    接下来我们看看create_redis_object干了什么
// file: redis.c

zend_object *
create_redis_object(zend_class_entry *ce)
{
    // 为分配一块内存给redis变量,该变量为redis_object类型
    redis_object *redis = ecalloc(1, sizeof(redis_object) + zend_object_properties_size(ce));

    // redis的socket初始化为空
    redis->sock = NULL;
    /** 
      * zend_object_std_init方法用来根据类的结构创建zend_object,然后赋值给redis->std,因为redis是内存指
      * 针,所以用&符号指定内存位置
      */
    zend_object_std_init(&redis->std, ce);
    // 初始化zend_object对象的属性
    object_properties_init(&redis->std, ce);

   /**
     * 将zend的object_handler指针复制一份到redis_object_handlers,意思是redis_object_handlers也指向了
     * object_handler
     */
    memcpy(&redis_object_handlers, zend_get_std_object_handlers(), sizeof(redis_object_handlers));
    // 在object_handler中找到管理redis对象的位置,并让操作指针指向该位置
    redis_object_handlers.offset = XtOffsetOf(redis_object, std);
    // 在该位置设置对象的销毁方法
    redis_object_handlers.free_obj = free_redis_object;
    redis->std.handlers = &redis_object_handlers;

    // 返回redis对象
    return &redis->std;
}

下面是redis_object的结构体

// file: common.h
typedef struct {
    RedisSock *sock;
    zend_object std;
} redis_object;
  • $redis->connect($host, $port, $timeout);

redis.c

// file:redis.c
/* {{{ proto boolean Redis::connect(string host, int port [, double timeout [, long retry_interval]])
 */
PHP_METHOD(Redis, connect)
{
    if (redis_connect(INTERNAL_FUNCTION_PARAM_PASSTHRU, 0) == FAILURE) {
        RETURN_FALSE;
    } else {
        RETURN_TRUE;
    }
}

PHP_REDIS_API int
redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent)
{
    zval *object, *context = NULL, *ele;
    char *host = NULL, *persistent_id = NULL;
    zend_long port = -1, retry_interval = 0;
    size_t host_len, persistent_id_len;
    double timeout = 0.0, read_timeout = 0.0;
    redis_object *redis;

// 线程安全模式下,不允许持久连接
#ifdef ZTS
    /* not sure how in threaded mode this works so disabled persistence at
     * first */
    persistent = 0;
#endif

    if (zend_parse_method_parameters(ZEND_NUM_ARGS(), getThis(),
                                     "Os|lds!lda", &object, redis_ce, &host,
                                     &host_len, &port, &timeout, &persistent_id,
                                     &persistent_id_len, &retry_interval,
                                     &read_timeout, &context) == FAILURE)
    {
        return FAILURE;
    }

    // 下面都是参数校验
    // ……省略了

    redis = PHPREDIS_ZVAL_GET_OBJECT(redis_object, object);
    // 判断如果有链接了,先断开,会将redis->sock的status属性设置为:REDIS_SOCK_STATUS_DISCONNECTED
    /* if there is a redis sock already we have to remove it */
    if (redis->sock) {
        redis_sock_disconnect(redis->sock, 0);
        redis_free_socket(redis->sock);
    }
    // 调用redis_sock_create方法获取redis_socket(只是获取,没连接)
    redis->sock = redis_sock_create(host, host_len, port, timeout, read_timeout, persistent,
        persistent_id, retry_interval);

    // ……又是一些设置,省略了

    // 调用redis_sock_server_open进行连接
    if (redis_sock_server_open(redis->sock) < 0) {
        // 小于0,连接失败,如果有错误,调用函数抛异常
        if (redis->sock->err) {
            REDIS_THROW_EXCEPTION(ZSTR_VAL(redis->sock->err), 0);
        }
        // 释放内存
        redis_free_socket(redis->sock);
        redis->sock = NULL;
        // 返回失败
        return FAILURE;
    }
    // 返回成功
    return SUCCESS;
}

library.c

// file:library.c
/**
 * redis_sock_server_open
 */
PHP_REDIS_API int
redis_sock_server_open(RedisSock *redis_sock)
{
    if (redis_sock) {
        switch (redis_sock->status) {
        case REDIS_SOCK_STATUS_DISCONNECTED:
            // 当前状态是未连接,走的是这里,调用redis_sock_connect方法连接
            if (redis_sock_connect(redis_sock) != SUCCESS) {
                break;
            } else if (redis_sock->status == REDIS_SOCK_STATUS_READY) {
                return SUCCESS;
            }
            // fall through
        case REDIS_SOCK_STATUS_CONNECTED:
            if (redis_sock_auth(redis_sock) != SUCCESS)
                break;
            redis_sock->status = REDIS_SOCK_STATUS_READY;
            // fall through
        case REDIS_SOCK_STATUS_READY:
            return SUCCESS;
        default:
            return FAILURE;
        }
    }
    return FAILURE;
}

/**
 * redis_sock_connect
 */
PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock)
{
        // ……前面是一堆参数初始化以及校验,省略了
#ifdef HAVE_IPV6
        /* If we've got IPv6 and find a colon in our address, convert to proper
         * IPv6 [host]:port format */
        if (strchr(address, ':') != NULL) {
            fmtstr = "%s://[%s]:%d";
        }
#endif
        host_len = snprintf(host, sizeof(host), fmtstr, scheme, address, redis_sock->port);
    }
    if (scheme_free) efree(scheme);
    // 如果是持久化连接
    if (redis_sock->persistent) {
        // 是否启用连接池
        if (INI_INT("redis.pconnect.pooling_enabled")) {
            // 从连接池获取连接(先不管这种模式,NTS模式下才有ZTS没有)
            p = redis_sock_get_connection_pool(redis_sock);
            // ……后面先省略了
        } else {
            // 没启用连接池
            // ……也先省略了
        }
    }

    // ……又有一些参数组装

    // 重要的是调用了php_stream_xport_create去获取用来操作的流
    redis_sock->stream = php_stream_xport_create(host, host_len,
        0, STREAM_XPORT_CLIENT | STREAM_XPORT_CONNECT,
        persistent_id ? ZSTR_VAL(persistent_id) : NULL,
        tv_ptr, redis_sock->stream_ctx, &estr, &err);

    // ……这里不重要了,省略
    // 连接成功后将状态改为REDIS_SOCK_STATUS_CONNECTED(已连接)
    redis_sock->status = REDIS_SOCK_STATUS_CONNECTED;

    return SUCCESS;
}

transports.c

PHPAPI php_stream *_php_stream_xport_create(const char *name, size_t namelen, int options,
        int flags, const char *persistent_id,
        struct timeval *timeout,
        php_stream_context *context,
        zend_string **error_string,
        int *error_code
        STREAMS_DC)
{
    // ……一堆参数初始化及校验

    /* check for a cached persistent socket */
    if (persistent_id) {
        // ……NTS模式的,先省略
    }


        // 获取协议头,这里没有,所以默认是tcp
    if ((*p == ':') && (n > 1) && !strncmp("://", p, 3)) {
        protocol = name;
        name = p + 3;
        namelen -= n + 3;
    } else {
        protocol = "tcp";
        n = 3;
    }

    if (protocol) {
             // 获取工厂方法,这里因为是tcp模式,获取的是tcp的工厂方法
        if (NULL == (factory = zend_hash_str_find_ptr(&xport_hash, protocol, n))) {
            char wrapper_name[32];

            if (n >= sizeof(wrapper_name))
                n = sizeof(wrapper_name) - 1;
            PHP_STRLCPY(wrapper_name, protocol, sizeof(wrapper_name), n);

            ERR_REPORT(error_string, "Unable to find the socket transport \"%s\" - did you forget to enable it when you configured PHP?",
                    wrapper_name);

            return NULL;
        }
    }

    if (factory == NULL) {
        /* should never happen */
        php_error_docref(NULL, E_WARNING, "Could not find a factory !?");
        return NULL;
    }

    //调用工厂方法创建stream,默认是tcp的工厂(只是创建了stream,没有连接)
    stream = (factory)(protocol, n,
            (char*)name, namelen, persistent_id, options, flags, timeout,
            context STREAMS_REL_CC);

    if (stream) {
        php_stream_context_set(stream, context);

        // 位运算,表示不是服务器端
        if ((flags & STREAM_XPORT_SERVER) == 0) {
            /* client */
            // 判断flags是否要进行连接或者是异步连接
            // 是则调用php_stream_xport_connect进行连接,返回-1就是连接失败,报错
            if (flags & (STREAM_XPORT_CONNECT|STREAM_XPORT_CONNECT_ASYNC)) {
                if (-1 == php_stream_xport_connect(stream, name, namelen,
                            flags & STREAM_XPORT_CONNECT_ASYNC ? 1 : 0,
                            timeout, &error_text, error_code)) {

                    ERR_RETURN(error_string, error_text, "connect() failed: %s");

                    failed = 1;
                }
            }

        } else {
            /* server */
            // ……服务器端的,先省略
        }
    }

    if (failed) {
        /* failure means that they don't get a stream to play with */
        if (persistent_id) {
            php_stream_pclose(stream);
        } else {
            php_stream_close(stream);
        }
        stream = NULL;
    }

    return stream;
}

/* Connect to a remote address */
PHPAPI int php_stream_xport_connect(php_stream *stream,
        const char *name, size_t namelen,
        int asynchronous,
        struct timeval *timeout,
        zend_string **error_text,
        int *error_code
        )
{
    // ……参数设置与校验,省略
    // 调用php_stream_set_option,该方法进行了连接
    ret = php_stream_set_option(stream, PHP_STREAM_OPTION_XPORT_API, 0, &param);

    if (ret == PHP_STREAM_OPTION_RETURN_OK) {
        if (error_text) {
            *error_text = param.outputs.error_text;
        }
        if (error_code) {
            *error_code = param.outputs.error_code;
        }
        return param.outputs.returncode;
    }

    return ret;

}

streams.c

// file:streams.c

// 设置流,实际上调用了socket的操作(如:php_tcp_sockop_set_option,根据连接协议的不同而不同)
PHPAPI int _php_stream_set_option(php_stream *stream, int option, int value, void *ptrparam)
{
    int ret = PHP_STREAM_OPTION_RETURN_NOTIMPL;

    if (stream->ops->set_option) {
               /**
                 * 调用set_option方法,根据前面工厂生成的stream不同而不同
                 * 由于是tcp工厂生成的,这里调用的是php_tcp_sockop_set_option
                 */
        ret = stream->ops->set_option(stream, option, value, ptrparam);
    }

    // ……后面不重要了,省略

    return ret;
}

xp_socket.c

//file: xp_socket.c
// 连接socket的关键
static int php_tcp_sockop_set_option(php_stream *stream, int option, int value, void *ptrparam)
{
        // 获得socket套接字的内存地址
    php_netstream_data_t *sock = (php_netstream_data_t*)stream->abstract;
    php_stream_xport_param *xparam;

    switch(option) {
        case PHP_STREAM_OPTION_XPORT_API:
            xparam = (php_stream_xport_param *)ptrparam;

            switch(xparam->op) {
                // 如果option是连接,就调用php_tcp_sockop_connect(stream, sock, xparam);进行连接
                case STREAM_XPORT_OP_CONNECT: // 同步阻塞连接
                case STREAM_XPORT_OP_CONNECT_ASYNC: // 异步连接
                    xparam->outputs.returncode = php_tcp_sockop_connect(stream, sock, xparam);
                    return PHP_STREAM_OPTION_RETURN_OK;
                // 如果option是绑定,php_tcp_sockop_bind(stream, sock, xparam);进行端口绑定
                case STREAM_XPORT_OP_BIND:
                    xparam->outputs.returncode = php_tcp_sockop_bind(stream, sock, xparam);
                    return PHP_STREAM_OPTION_RETURN_OK;

                // 如果option是接收链接(服务端用),php_tcp_sockop_accept(stream, sock, xparam);进行阻塞等待连接进入
                case STREAM_XPORT_OP_ACCEPT:
                    xparam->outputs.returncode = php_tcp_sockop_accept(stream, sock, xparam STREAMS_CC);
                    return PHP_STREAM_OPTION_RETURN_OK;
                default:
                    /* fall through */
                    ;
            }
    }
    return php_sockop_set_option(stream, option, value, ptrparam);
}

static inline int php_tcp_sockop_connect(php_stream *stream, php_netstream_data_t *sock,
        php_stream_xport_param *xparam)
{
    // ……初始化了写参数,省略

#ifdef AF_UNIX
    // 如果是本地套接字模式(入参host用localhost的场景)
    if (stream->ops == &php_stream_unix_socket_ops || stream->ops == &php_stream_unixdg_socket_ops) {
        struct sockaddr_un unix_addr;

        // 调用socket方法创建获取套接字的文件描述符(int)
        sock->socket = socket(PF_UNIX, stream->ops == &php_stream_unix_socket_ops ? SOCK_STREAM : SOCK_DGRAM, 0);

        if (sock->socket == SOCK_ERR) {
            if (xparam->want_errortext) {
                xparam->outputs.error_text = strpprintf(0, "Failed to create unix socket");
            }
            return -1;
        }

        parse_unix_address(xparam, &unix_addr);

        // 调用php_network_connect_socket进行连接(connect)
        ret = php_network_connect_socket(sock->socket,
                (const struct sockaddr *)&unix_addr, (socklen_t) XtOffsetOf(struct sockaddr_un, sun_path) + xparam->inputs.namelen,
                xparam->op == STREAM_XPORT_OP_CONNECT_ASYNC, xparam->inputs.timeout,
                xparam->want_errortext ? &xparam->outputs.error_text : NULL,
                &err);

        xparam->outputs.error_code = err;
                // 连接失败直接跳到该方法后面的out位置
        goto out;
    }
#endif

    // …… 又是一堆参数初始化及非空校验,省略

    /* Note: the test here for php_stream_udp_socket_ops is important, because we
     * want the default to be TCP sockets so that the openssl extension can
     * re-use this code. */

    // 非本地套接字模式(tcp协议)
    sock->socket = php_network_connect_socket_to_host(host, portno,
            stream->ops == &php_stream_udp_socket_ops ? SOCK_DGRAM : SOCK_STREAM,
            xparam->op == STREAM_XPORT_OP_CONNECT_ASYNC,
            xparam->inputs.timeout,
            xparam->want_errortext ? &xparam->outputs.error_text : NULL,
            &err,
            bindto,
            bindport,
            sockopts
            );

        // 后面不重要了
    ret = sock->socket == -1 ? -1 : 0;
    xparam->outputs.error_code = err;

    if (host) {
        efree(host);
    }
    if (bindto) {
        efree(bindto);
    }

#ifdef AF_UNIX
out:
#endif

    if (ret >= 0 && xparam->op == STREAM_XPORT_OP_CONNECT_ASYNC && err == EINPROGRESS) {
        /* indicates pending connection */
        return 1;
    }

    return ret;
}

network.c

// file:network.c
/* {{{ php_network_connect_socket_to_host */
php_socket_t php_network_connect_socket_to_host(const char *host, unsigned short port,
        int socktype, int asynchronous, struct timeval *timeout, zend_string **error_string,
        int *error_code, const char *bindto, unsigned short bindport, long sockopts
        )
{
    // …… 前面暂时不重要
        // 这里再循环啥暂时没看懂
    for (sal = psal; !fatal && *sal != NULL; sal++) {
        sa = *sal;

        switch (sa->sa_family) {
#if HAVE_GETADDRINFO && HAVE_IPV6
            case AF_INET6: // ipv6
                if (!bindto || strchr(bindto, ':')) {
                    ((struct sockaddr_in6 *)sa)->sin6_port = htons(port);
                    socklen = sizeof(struct sockaddr_in6);
                } else {
                    /* Expect IPV4 address, skip to the next */
                    continue;
                }
                break;
#endif
            case AF_INET: // ipv4
                ((struct sockaddr_in *)sa)->sin_port = htons(port);
                socklen = sizeof(struct sockaddr_in);
                if (bindto && strchr(bindto, ':')) {
                    /* IPV4 sock cannot bind to IPV6 address */
                    bindto = NULL;
                }
                break;
            default:
                /* Unsupported family, skip to the next */
                continue;
        }

        // 创建套接字文件描述符
        /* create a socket for this address */
        sock = socket(sa->sa_family, socktype, 0);

        if (sock == SOCK_ERR) {
            continue;
        }

            /* make a connection attempt */

        if (bindto) {
            // ……一堆处理ip地址的操作
            // 绑定地址(bind方法)
            // ip地址为空,报warnning,否则调用bind
            if (local_address_len == 0) {
                php_error_docref(NULL, E_WARNING, "Invalid IP Address: %s", bindto);
            } else if (bind(sock, &local_address.common, local_address_len)) {
                php_error_docref(NULL, E_WARNING, "Failed to bind to '%s:%d', system said: %s", bindto, bindport, strerror(errno));
            }
        }
        // ……中间这里有一堆先不管了,省略
        // 最后还是调用php_network_connect_socket进行连接
        n = php_network_connect_socket(sock, sa, socklen, asynchronous,
                timeout ? &working_timeout : NULL,
                error_string, error_code);

        if (n != -1) {
            goto connected; // 连接成功跳转到connected位置
        }

        // ……中间省略一堆

connected:
        // 释放存放IP地址占用的内存
    php_network_freeaddresses(psal);

    return sock;
}
/* }}} */

/* {{{ php_network_connect_socket */
PHPAPI int php_network_connect_socket(php_socket_t sockfd,
        const struct sockaddr *addr,
        socklen_t addrlen,
        int asynchronous,
        struct timeval *timeout,
        zend_string **error_string,
        int *error_code)
{
    php_non_blocking_flags_t orig_flags;
    int n;
    int error = 0;
    socklen_t len;
    int ret = 0;

    SET_SOCKET_BLOCKING_MODE(sockfd, orig_flags);

    // 调用connect方法
    if ((n = connect(sockfd, addr, addrlen)) != 0) {
        error = php_socket_errno();

        if (error_code) {
            *error_code = error;
        }

        if (error != EINPROGRESS) {
            if (error_string) {
                *error_string = php_socket_error_str(error);
            }

            return -1;
        }
        if (asynchronous && error == EINPROGRESS) {
            /* this is fine by us */
            return 0;
        }
    }

    if (n == 0) {
        goto ok;
    }
    // ……中间一堆先省略
ok:
    if (!asynchronous) {
        /* back to blocking mode */
        RESTORE_SOCKET_BLOCKING_MODE(sockfd, orig_flags);
    }

    if (error_code) {
        *error_code = error;
    }

    if (error) {
        ret = -1;
        if (error_string) {
            *error_string = php_socket_error_str(error);
        }
    }
    return ret;
}
/* }}} */
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容