基于brpc的轻量级服务注册中心设计与实现

在微服务架构日益普及的今天,服务之间的高效通信与动态发现已成为构建高可用分布式系统的核心挑战。面对成百上千个可能随时上下线的微服务,如何确保消费者能够快速、准确地找到可用的服务提供者?服务注册中心正是解决这一问题的关键所在。

brpc 通过 NamingService 支持 consul、zookeeper 等作为服务注册中心。有时候可能系统并不需要现有服务注册中心功能太过复杂,同时可以对接自己的持久化系统,实现简单的服务注册和服务发现功能。

一、整体架构设计:三大角色协同作业

基于brpc框架实现的轻量级服务注册中心构建了一个完整的服务治理闭环,系统包含三个核心组件:

  • RegistryServer(注册中心):作为系统核心,负责维护和管理服务注册表
  • ServiceProvider(服务提供者):向注册中心注册自身服务信息并提供实际服务能力
  • ServiceConsumer(服务消费者):从注册中心获取服务列表并进行服务调用
image.png

从架构示意图可以看出,这些组件之间的交互关系清晰而高效:

  • ServiceProvider启动时向注册中心注册服务信息,并通过定期心跳保持连接
  • ServiceConsumer从注册中心获取可用服务列表,并定期更新
  • 消费者直接向提供者发起服务请求,注册中心不参与实际的数据传输

二、注册中心核心功能解析

  1. 服务注册表设计
    注册中心通过两个核心数据结构管理服务信息:
// 服务名到服务地址列表的映射
unordered_map<string, vector<registry::ServerAddress>> _service_map;
// 服务唯一标识到最后心跳时间的映射(用于健康检查)
unordered_map<string, time_t> _heartbeat_map;
  1. 四大核心接口:

注册中心通过RPC接口实现服务:

  • Register服务注册服务提供者启动时注册自身
  • Unregister服务注销服务提供者优雅退出时调用
  • Discover服务发现消费者获取可用服务列表
  • Heartbeat心跳检测维护服务实例的存活状态
    以注册接口为例:
void Register(google::protobuf::RpcController* cntl,
              const registry::RegisterRequest* request,
              registry::RegisterResponse* response,
              google::protobuf::Closure* done) override {
}
  1. 健康检查机制:保证服务列表实时准确
    为了保证服务列表的实时性和准确性,注册中心实现了完整的健康检查机制:

定期扫描:后台线程每10秒检查一次所有服务的心跳状态
超时剔除:超过30秒未收到心跳的服务会被自动移除
自动清理:当某个服务的所有实例都被移除后,该服务名也会从注册表中删除

void health_check() {
    while (!_stop_health_check) {
        this_thread::sleep_for(chrono::seconds(10));  // 每10秒检查一次
        // 1. 遍历检查所有服务的心跳状态
        // 2. 移除超时服务实例
        // 3. 更新服务注册表状态
    }
}

三、数据结构设计:简洁而完整

服务注册中心通过Protobuf定义了一套简洁而完整的数据结构:

// 服务地址信息
message ServerAddress {
    required string ip = 1;      // 服务IP地址
    required int32 port = 2;     // 服务端口号
}

// 注册中心核心服务接口
service RegistryService {
    rpc Register(RegisterRequest) returns (RegisterResponse);
    rpc Unregister(UnregisterRequest) returns (UnregisterResponse);
    rpc Discover(DiscoverRequest) returns (DiscoverResponse);
    rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

这种设计保证了组件间通信的标准化和可扩展性。

四、完整示例代码

#include <brpc/server.h>
#include <unordered_map>
#include <mutex>
#include <chrono>
#include <thread>
#include <ctime>
#include <iomanip>
#include <iostream>
#include "registry_service.pb.h"

using namespace std;
using namespace brpc;

// 日志输出宏(带时间戳)
#define LOG_INFO(msg) do { \
    auto now = chrono::system_clock::now(); \
    time_t now_time = chrono::system_clock::to_time_t(now); \
    cout << "[" << put_time(localtime(&now_time), "%Y-%m-%d %H:%M:%S") << "] [INFO] " << msg << endl; \
} while(0)

#define LOG_WARN(msg) do { \
    auto now = chrono::system_clock::now(); \
    time_t now_time = chrono::system_clock::to_time_t(now); \
    cerr << "[" << put_time(localtime(&now_time), "%Y-%m-%d %H:%M:%S") << "] [WARN] " << msg << endl; \
} while(0)

// 服务注册中心核心类,管理服务列表和健康检查
class RegistryServiceImpl : public registry::RegistryService {
public:
    RegistryServiceImpl() {
        // 启动健康检查线程(定期清理超时服务)
        _health_check_thread = thread(&RegistryServiceImpl::health_check, this);
        LOG_INFO("Registry service initialized, health check thread started");
    }

    ~RegistryServiceImpl() {
        _stop_health_check = true;
        if (_health_check_thread.joinable()) {
            _health_check_thread.join();
        }
        LOG_INFO("Registry service stopped");
    }

    // 服务注册
    void Register(google::protobuf::RpcController* cntl,
                  const registry::RegisterRequest* request,
                  registry::RegisterResponse* response,
                  google::protobuf::Closure* done) override {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* c = static_cast<brpc::Controller*>(cntl);

        lock_guard<mutex> lock(_mutex);
        const string& service_name = request->service_name();
        const auto& addr = request->address();
        string addr_str = addr.ip() + ":" + to_string(addr.port());

        // 验证参数
        if (service_name.empty() || addr.ip().empty() || addr.port() <= 0) {
            response->set_status(registry::RegisterResponse::INVALID_PARAM);
            response->set_message("Invalid service name or address");
            LOG_WARN("Register failed (invalid param): service=" << service_name << ", address=" << addr_str);
            return;
        }

        // 检查是否已注册
        auto& servers = _service_map[service_name];
        for (const auto& s : servers) {
            if (s.ip() == addr.ip() && s.port() == addr.port()) {
                response->set_status(registry::RegisterResponse::ALREADY_EXISTS);
                response->set_message("Service already registered");
                LOG_WARN("Register failed (already exists): service=" << service_name << ", address=" << addr_str);
                return;
            }
        }

        // 添加服务并更新心跳时间
        servers.push_back(addr);
        _heartbeat_map[get_key(service_name, addr)] = time(nullptr);

        response->set_status(registry::RegisterResponse::SUCCESS);
        response->set_message("Register success");
        LOG_INFO("Service registered: service=" << service_name << ", address=" << addr_str);
        
        // 打印当前service_map状态
        print_service_map();
    }

    // 服务注销
    void Unregister(google::protobuf::RpcController* cntl,
                    const registry::UnregisterRequest* request,
                    registry::UnregisterResponse* response,
                    google::protobuf::Closure* done) override {
        brpc::ClosureGuard done_guard(done);

        lock_guard<mutex> lock(_mutex);
        const string& service_name = request->service_name();
        const auto& addr = request->address();
        string addr_str = addr.ip() + ":" + to_string(addr.port());

        auto it = _service_map.find(service_name);
        if (it == _service_map.end()) {
            response->set_status(registry::UnregisterResponse::NOT_FOUND);
            LOG_WARN("Unregister failed (not found): service=" << service_name << ", address=" << addr_str);
            return;
        }

        // 从服务列表中移除
        auto& servers = it->second;
        bool removed = false;
        for (auto iter = servers.begin(); iter != servers.end(); ++iter) {
            if (iter->ip() == addr.ip() && iter->port() == addr.port()) {
                servers.erase(iter);
                removed = true;
                break;
            }
        }

        if (removed) {
            _heartbeat_map.erase(get_key(service_name, addr));
            response->set_status(registry::UnregisterResponse::SUCCESS);
            LOG_INFO("Service unregistered: service=" << service_name << ", address=" << addr_str);
            
            // 如果服务列表为空,从map中移除该服务名
            if (servers.empty()) {
                _service_map.erase(it);
                LOG_INFO("Service entry removed (no addresses left): service=" << service_name);
            }
            
            // 打印当前service_map状态
            print_service_map();
        } else {
            response->set_status(registry::UnregisterResponse::NOT_FOUND);
            LOG_WARN("Unregister failed (not found): service=" << service_name << ", address=" << addr_str);
        }
    }

    // 服务发现
    void Discover(google::protobuf::RpcController* cntl,
                  const registry::DiscoverRequest* request,
                  registry::DiscoverResponse* response,
                  google::protobuf::Closure* done) override {
        brpc::ClosureGuard done_guard(done);

        lock_guard<mutex> lock(_mutex);
        const string& service_name = request->service_name();

        auto it = _service_map.find(service_name);
        if (it != _service_map.end()) {
            // 返回所有可用服务地址
            for (const auto& addr : it->second) {
                *response->add_addresses() = addr;
            }
            LOG_INFO("Service discovered: service=" << service_name << ", count=" << it->second.size());
        } else {
            LOG_WARN("Service not found during discovery: service=" << service_name);
        }
    }

    // 心跳检测(更新服务存活状态)
    void Heartbeat(google::protobuf::RpcController* cntl,
                   const registry::HeartbeatRequest* request,
                   registry::HeartbeatResponse* response,
                   google::protobuf::Closure* done) override {
        brpc::ClosureGuard done_guard(done);

        lock_guard<mutex> lock(_mutex);
        const string service_name = request->service_name();
        const auto& addr = request->address();
        const string key = get_key(service_name, addr);
        string addr_str = addr.ip() + ":" + to_string(addr.port());

        if (_heartbeat_map.count(key)) {
            _heartbeat_map[key] = time(nullptr);  // 更新心跳时间
            response->set_status(registry::HeartbeatResponse::SUCCESS);
            LOG_INFO("Heartbeat received: service=" << service_name << ", address=" << addr_str);
        } else {
            response->set_status(registry::HeartbeatResponse::NOT_REGISTERED);
            LOG_WARN("Heartbeat failed (not registered): service=" << service_name << ", address=" << addr_str);
        }
    }

private:
    // 生成服务唯一标识(服务名+IP+端口)
    string get_key(const string& service_name, const registry::ServerAddress& addr) {
        return service_name + "|" + addr.ip() + ":" + to_string(addr.port());
    }

    // 打印当前service_map中的所有服务信息
    void print_service_map() {
        LOG_INFO("Current service map status:");
        if (_service_map.empty()) {
            LOG_INFO("  No services registered");
            return;
        }
        for (const auto& entry : _service_map) {
            LOG_INFO("  Service: " << entry.first << " (count=" << entry.second.size() << ")");
            for (const auto& addr : entry.second) {
                LOG_INFO("    Address: " << addr.ip() << ":" << addr.port() << ", weight=" << addr.weight());
            }
        }
    }

    // 健康检查线程:清理超过30秒未心跳的服务
    void health_check() {
        while (!_stop_health_check) {
            this_thread::sleep_for(chrono::seconds(10));  // 每10秒检查一次
            lock_guard<mutex> lock(_mutex);

            const time_t now = time(nullptr);
            vector<string> expired_keys;

            // 找出超时服务
            for (const auto& p : _heartbeat_map) {
                if (now - p.second > 30) {  // 30秒超时
                    expired_keys.push_back(p.first);
                }
            }

            // 移除超时服务
            if (!expired_keys.empty()) {
                LOG_INFO("Health check: found " << expired_keys.size() << " expired service(s)");
                for (const string& key : expired_keys) {
                    _heartbeat_map.erase(key);
                    // 解析key获取服务名和地址
                    size_t pos1 = key.find('|');
                    size_t pos2 = key.find(':', pos1 + 1);
                    if (pos1 == string::npos || pos2 == string::npos) {
                        LOG_WARN("Invalid key format during cleanup: " << key);
                        continue;
                    }

                    string service_name = key.substr(0, pos1);
                    string ip = key.substr(pos1 + 1, pos2 - pos1 - 1);
                    int port = stoi(key.substr(pos2 + 1));
                    string addr_str = ip + ":" + to_string(port);

                    // 从服务列表中删除
                    auto it = _service_map.find(service_name);
                    if (it != _service_map.end()) {
                        auto& servers = it->second;
                        for (auto iter = servers.begin(); iter != servers.end(); ++iter) {
                            if (iter->ip() == ip && iter->port() == port) {
                                servers.erase(iter);
                                LOG_INFO("Expired service removed: service=" << service_name << ", address=" << addr_str);
                                break;
                            }
                        }
                        // 如果服务列表为空,从map中移除该服务名
                        if (servers.empty()) {
                            _service_map.erase(it);
                            LOG_INFO("Service entry removed (no addresses left): service=" << service_name);
                        }
                    }
                }
                // 打印清理后的service_map状态
                print_service_map();
            } else {
                LOG_INFO("Health check: no expired services");
            }
        }
    }

    mutex _mutex;
    // 服务名 -> 服务地址列表映射
    unordered_map<string, vector<registry::ServerAddress>> _service_map;
    // 服务唯一标识 -> 最后心跳时间映射(用于健康检查)
    unordered_map<string, time_t> _heartbeat_map;
    thread _health_check_thread;
    atomic<bool> _stop_health_check{false};
};

int main(int argc, char* argv[]) {
    // 初始化服务器
    brpc::Server server;
    RegistryServiceImpl registry_service;

    // 注册服务
    if (server.AddService(&registry_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
        cerr << "Failed to add registry service" << endl;
        return -1;
    }

    // 设置服务器地址
    brpc::ServerOptions options;
    options.idle_timeout_sec = 300;
    if (server.Start(18888, &options) != 0) {  // 注册中心监听8888端口
        cerr << "Failed to start registry server" << endl;
        return -1;
    }

    LOG_INFO("Registry server started on port 18888");
    server.RunUntilAskedToQuit();
    return 0;
}

五、高可用与扩展性考虑

虽然这个轻量级注册中心已经实现了核心功能,但在生产环境中还需要考虑以下扩展方案:

  1. 注册中心集群化
    使用braft等框架实现注册中心集群,保证数据一致性
    采用主备模式或多活模式,避免单点故障

  2. 数据持久化
    将服务列表持久化到高性能存储系统
    实现定期快照和操作日志记录,支持快速恢复

  3. 性能优化策略
    服务列表本地缓存,减少注册中心访问压力
    批量操作接口,提高大批量服务注册/注销效率

总结

基于brpc实现的轻量级服务注册中心提供了一个简单而高效的解决方案,特别适合那些不需要复杂功能但要求高性能和可靠性的微服务场景。通过核心的注册表管理、健康检查机制和简洁的接口设计,该系统能够有效地解决服务动态发现的问题。同时,通过集群化、持久化和性能优化等扩展方案,这个轻量级系统完全可以满足生产环境的高可用和高性能要求。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容