BRPC 基于zookeeper的命名服务实现

命名服务

命名服务是用将易记的名称(通常是由人类可读的名称)映射到特定的网络资源或服务。它为用户提供了一种方便的方式来访问网络资源,而无需记住资源的物理地址或复杂的网络标识符。

BRPC NamingService

在brpc中,使用NamingService框架来管理命名服务,同时集成负载均衡策略。
在当前BRPC版本(1.7.0)中已经内置了多种命名服务实现其中包括:BNS、DNS、File、consul等方式。
而zookeeper也是一种比较常见、常用的命名服务工具,以下是通过zookeeper搭建命名服务并集成到BRPC的NamingService框架中。

zookeeper命名服务的实现

第一步:初始化连接ZK服务,设置命名服务名称节点

int Connect() {
        int ret = 0;
        // 设置 ZK 日志级别
        zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
        // 完成zk初始化,获取zk句柄,连接ZK服务
        zk_handle_ = zookeeper_init(zk_config_.GetServers().c_str(), zk_watcher_g,
                                    zk_config_.GetTimeout(), 0, this, 0);

        if (NULL == zk_handle_) {
            LOG(ERROR) << "Error when connecting to zookeeper servers...";
            return -1;
        }
        // 完成zk验证
        if (!zk_config_.GetUserAuth().empty()) {
            ret = zoo_add_auth(zk_handle_, "digest", zk_config_.GetUserAuth().c_str(),
                               zk_config_.GetUserAuth().length(), NULL, NULL);
            if (ret) {
                LOG(ERROR) << "error" << ret << " for verification.";
                return -2;
            }
        }
        // 设置命名服务ZK节点路径 /naming_service_example/naming_service1
        // 用于watch服务信息变化
        data_dir_ = zk_config_.GetDataDir();
        return 0;
    };

第二步:添加Watcher 持续监控命名服务下的节点变化

/**
     * AddWatcher 添加 命名服务 节点监控
     * **/
void AddWatcher() {
    int ret = zoo_get_children(zk_handle_, data_dir_.c_str(), 1, &children_names_);
    if (ret != ZOK || children_names_.count <= 0) {
        LOG(INFO) << "default zookeeper data parser data is empty." << children_names_.count;
    }
    // 处理命名服务节点下的内容信息
    parser_children_names();
    runing_ = true;
    watcher_thread_ = std::thread(&ZookeeperClient::do_parser_loop, this);
};

第三步:持续添加节点变化watcher
由于zookeeper的注册event是一次性的,因此,watcher在接收到上次注册的event以后就会失效,为了持续监听zookeeper服务节点变化,每次watcher失效以后需要重新注册watcher事件,因为命名服务一直监听命名服务节点下的变化,因此可以通过zoo_get_children方法进行watcher的重新注册的功能

// 可通过 设置zoo_get_children的第三个参数 watch 为非0 达到重新注册的目的,注册的回调函数为zookeeper_init 传入的 zk_watcher_g 方法
ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch,
                            struct String_vector *strings)
static void zk_watcher_g(zhandle_t* zh, int type, int state, const char* path,
                             void* watcherCtx) {
        ZookeeperClient *self = (ZookeeperClient *)watcherCtx;
        if (type == ZOO_CHILD_EVENT) {
            std::lock_guard<std::mutex> lock(self->mutex_);
            zoo_get_children(zh, path, 1, &self->children_names_);
            self->is_changed_ = true;
        } else if (ZOO_SESSION_EVENT == type) {
            if (ZOO_CONNECTED_STATE == state) {
                LOG(INFO) << "Connected to ZooKeeper.";
            } else if (ZOO_EXPIRED_SESSION_STATE == state) {
                LOG(INFO) << "ZooKeeper session expired.";
                // TODO: Handle session expiration
                // do reconect
                for (int i = 0; i < 10; i++) {
                    if (0 == self->Connect()) {
                        break;
                    }
                    LOG(WARNING) << "Do zookeeper reconnect, times:" << i;
                    sleep(5);
                }
            }
        }
    };

zookeeper命名服务集成到BRPC

命名服务集成到BRPC 的NamingService框架只需要集成NamingService然后重写父类方法即可,最好通过注册的方式,集成到框架中
第一步:继承brpc::NamingService 类

class ZooKeeperNamingService : public brpc::PeriodicNamingService {}

第二步:重写父类方法实现自定义处理逻辑

// 持续获取命名服务节点下的服务器列表,根据各自的业务需求实现列表获取逻辑
// 列表通过 zk watcher 持续监听变化更新
int GetServers(const char *service_name, std::vector<brpc::ServerNode> *servers) override;
// 服务列表更新时间间隔,也就是框架调用GetServers方法的时间间隔
int GetNamingServiceAccessIntervalMs() const override;
void Describe(std::ostream &os, const brpc::DescribeOptions &) const override;
// 生成 命名服务对象
NamingService *New() const override;
void Destroy() override;

第三步:集成到BRPC的NamingService框架

// 设置ZK连接信息
ZKNamingService::ZKConfig zk_config("127.0.0.1:2181", "",
                              "test", "ns_test", 1000);
// 初始化内部ZK连接信息
ZKNamingService::ZooKeeperNamingService zkns(zk_config, 2000);
// 将命名服务注册到 NamingService框架,该步骤只是将命名服务类注册到框架中,并没有实际执行,
// 在channel.Init()时通过brpc::NamingService* ZooKeeperNamingService::New();方法重新生成对象并执行相关逻辑
brpc::NamingServiceExtension()->RegisterOrDie("zk", &zkns);
brpc::Channel channel;

// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.timeout_ms = 1000 /*milliseconds*/;
options.max_retry = 1;
// 使用命名服务
if (channel.Init("zk://ns_test", "rr", &options) != 0) {
    LOG(ERROR) << "Fail to initialize channel";
    return -1;
}

参考文档

brpc客户端命名服务
brpc负载均衡
BNS命名服务
consul命名服务

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容