命名服务
命名服务是用将易记的名称(通常是由人类可读的名称)映射到特定的网络资源或服务。它为用户提供了一种方便的方式来访问网络资源,而无需记住资源的物理地址或复杂的网络标识符。
BRPC NamingService
在brpc中,使用NamingService框架来管理命名服务,同时集成负载均衡策略。
在当前BRPC版本(1.7.0)中已经内置了多种命名服务实现其中包括:BNS、DNS、File、consul等方式。
而zookeeper也是一种比较常见、常用的命名服务工具,以下是通过zookeeper搭建命名服务并集成到BRPC的NamingService框架中。
zookeeper命名服务的实现
第一步:初始化连接ZK服务,设置命名服务名称节点
int Connect() {
int ret = 0;
// 设置 ZK 日志级别
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
// 完成zk初始化,获取zk句柄,连接ZK服务
zk_handle_ = zookeeper_init(zk_config_.GetServers().c_str(), zk_watcher_g,
zk_config_.GetTimeout(), 0, this, 0);
if (NULL == zk_handle_) {
LOG(ERROR) << "Error when connecting to zookeeper servers...";
return -1;
}
// 完成zk验证
if (!zk_config_.GetUserAuth().empty()) {
ret = zoo_add_auth(zk_handle_, "digest", zk_config_.GetUserAuth().c_str(),
zk_config_.GetUserAuth().length(), NULL, NULL);
if (ret) {
LOG(ERROR) << "error" << ret << " for verification.";
return -2;
}
}
// 设置命名服务ZK节点路径 /naming_service_example/naming_service1
// 用于watch服务信息变化
data_dir_ = zk_config_.GetDataDir();
return 0;
};
第二步:添加Watcher 持续监控命名服务下的节点变化
/**
* AddWatcher 添加 命名服务 节点监控
* **/
void AddWatcher() {
int ret = zoo_get_children(zk_handle_, data_dir_.c_str(), 1, &children_names_);
if (ret != ZOK || children_names_.count <= 0) {
LOG(INFO) << "default zookeeper data parser data is empty." << children_names_.count;
}
// 处理命名服务节点下的内容信息
parser_children_names();
runing_ = true;
watcher_thread_ = std::thread(&ZookeeperClient::do_parser_loop, this);
};
第三步:持续添加节点变化watcher
由于zookeeper的注册event是一次性的,因此,watcher在接收到上次注册的event以后就会失效,为了持续监听zookeeper服务节点变化,每次watcher失效以后需要重新注册watcher事件,因为命名服务一直监听命名服务节点下的变化,因此可以通过zoo_get_children方法进行watcher的重新注册的功能
// 可通过 设置zoo_get_children的第三个参数 watch 为非0 达到重新注册的目的,注册的回调函数为zookeeper_init 传入的 zk_watcher_g 方法
ZOOAPI int zoo_get_children(zhandle_t *zh, const char *path, int watch,
struct String_vector *strings)
static void zk_watcher_g(zhandle_t* zh, int type, int state, const char* path,
void* watcherCtx) {
ZookeeperClient *self = (ZookeeperClient *)watcherCtx;
if (type == ZOO_CHILD_EVENT) {
std::lock_guard<std::mutex> lock(self->mutex_);
zoo_get_children(zh, path, 1, &self->children_names_);
self->is_changed_ = true;
} else if (ZOO_SESSION_EVENT == type) {
if (ZOO_CONNECTED_STATE == state) {
LOG(INFO) << "Connected to ZooKeeper.";
} else if (ZOO_EXPIRED_SESSION_STATE == state) {
LOG(INFO) << "ZooKeeper session expired.";
// TODO: Handle session expiration
// do reconect
for (int i = 0; i < 10; i++) {
if (0 == self->Connect()) {
break;
}
LOG(WARNING) << "Do zookeeper reconnect, times:" << i;
sleep(5);
}
}
}
};
zookeeper命名服务集成到BRPC
命名服务集成到BRPC 的NamingService框架只需要集成NamingService然后重写父类方法即可,最好通过注册的方式,集成到框架中
第一步:继承brpc::NamingService 类
class ZooKeeperNamingService : public brpc::PeriodicNamingService {}
第二步:重写父类方法实现自定义处理逻辑
// 持续获取命名服务节点下的服务器列表,根据各自的业务需求实现列表获取逻辑
// 列表通过 zk watcher 持续监听变化更新
int GetServers(const char *service_name, std::vector<brpc::ServerNode> *servers) override;
// 服务列表更新时间间隔,也就是框架调用GetServers方法的时间间隔
int GetNamingServiceAccessIntervalMs() const override;
void Describe(std::ostream &os, const brpc::DescribeOptions &) const override;
// 生成 命名服务对象
NamingService *New() const override;
void Destroy() override;
第三步:集成到BRPC的NamingService框架
// 设置ZK连接信息
ZKNamingService::ZKConfig zk_config("127.0.0.1:2181", "",
"test", "ns_test", 1000);
// 初始化内部ZK连接信息
ZKNamingService::ZooKeeperNamingService zkns(zk_config, 2000);
// 将命名服务注册到 NamingService框架,该步骤只是将命名服务类注册到框架中,并没有实际执行,
// 在channel.Init()时通过brpc::NamingService* ZooKeeperNamingService::New();方法重新生成对象并执行相关逻辑
brpc::NamingServiceExtension()->RegisterOrDie("zk", &zkns);
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.timeout_ms = 1000 /*milliseconds*/;
options.max_retry = 1;
// 使用命名服务
if (channel.Init("zk://ns_test", "rr", &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}