众所周知,要请求一个服务,必须要知道服务的网络地址(IP和端口)。随着微服务的发展,越来越多的用户、请求和需求使得请求服务这项工作变得非常困难。在基于云原生的微服务时代,我们的服务由于各种情况会经常发生变更,例如自动伸缩、升级和故障。因为这些变化,服务实例会不断获取新的IP。
这就是服务发现进入微服务场景的地方。我们需要某种系统能定时跟踪所有服务,并更新服务的IP/端口,这样客户端就可以无缝请求到服务。
服务发现在观念上很简单:核心组件就是服务注册,本质就是存储服务网络地址(IP/端口)的数据库。这种机制在服务实例启动和停止时更新服务注册表。
有两种主流的服务发现方式:
- 服务端和客户端直接与服务注册组件通信。
- 部署基础设施例如K8S等,来处理服务发现。
本文将使用第三方注册模式来实现我们的服务发现功能。这种模式无需服务自身主动注册,通过第三方registar模块来完成(该模块定时调docker ps -a来获取服务信息进行注册)。
反向代理
为了实现反向代理,这里使用httputil包。主要目的是为了提供负载均衡。为了实现客户端请求以轮询方式路由到服务端,我们采用简单的算法,根据请求总数对注册服务数量取模,可以很简单的找到服务端并代理请求。
package main
import (
"fmt"
"net/http"
"sync/atomic"
)
type Application struct {
RequestCount uint64
SRegistry *ServiceRegistry
}
func (a *Application) Handle(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&a.RequestCount, 1)
if a.SRegistry.Len() == 0 {
w.Write([]byte(`No backend entry in the service registry`))
return
}
//请求数对服务实例个数取模
backendIndex := int(atomic.LoadUint64(&a.RequestCount) % uint64(a.SRegistry.Len()))
fmt.Printf("Request routing to instance %d\n", backendIndex)
//将请求代理转发到对应的服务端
a.SRegistry.GetByIndex(backendIndex).
proxy.
ServeHTTP(w, r)
}
上面的代码很简单,定义Application结构体,包含RequestCount请求总数和SRegistry服务注册字段。定义请求处理函数Handle,根据请求总数和后端服务实例个数取模,调用GetByIndex函数读取服务IP和端口,将请求代理转发到对应的后端服务。
Registar
使用time.Tick实现定时发现服务变化。这里我们是一个服务实例对应一个容器。每当定时到了,执行docker ps -a使用docker官方SDK来读取服务变化。使用-a参数是为了知道哪个容器退出了,需要将它从服务注册列表中删除。如果有新的容器增加且是运行状态,检查是否已注册到服务列表,如果没有就将其添加到注册列表中。
package main
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"time"
)
type Registrar struct {
Interval time.Duration
DockerCLI *client.Client
SRegistry *ServiceRegistry
}
const (
HelloServiceImageName = "hello" //后端服务实例docker镜像名称
ContainerRunningState = "running" //服务运行状态
)
func (r *Registrar) Observe() {
for range time.Tick(r.Interval) { //定时器
//获取容器列表
cList, _ := r.DockerCLI.ContainerList(context.Background(), types.ContainerListOptions{
All: true,
})
//没有容器运行,意味着没有后端服务可用,清空注册列表
if len(cList) == 0 {
r.SRegistry.RemoveAll()
continue
}
//镜像过滤名称不是hello的容器,也就是指定服务
for _, c := range cList {
if c.Image != HelloServiceImageName {
continue
}
//根据容器ID查找该后端服务是否已经注册
_, exist := r.SRegistry.GetByContainerID(c.ID)
if c.State == ContainerRunningState {
//容器运行但是为注册,执行注册操作
if !exist {
addr := fmt.Sprintf("http://localhost:%d", c.Ports[0].PublicPort)
r.SRegistry.Add(c.ID, addr)
}
} else {
//容器不是运行状态,但已注册需移除
if exist {
r.SRegistry.RemoveByContainerID(c.ID)
}
}
}
}
}
上面的代码Observe方法,主要是定时调docker ps -a命令,根据镜像名称HelloServiceImageName来锁定我们的服务实例,同一类服务使用的相同的镜像。根据前面讨论的结论,容器正常运行就考虑添加到注册列表,否则移除。
服务注册
服务注册是一个非常基本的结构体切片,使用sync.RWMutex来实现并发同步,保存所有的正常服务实例列表。会定时更新列表。
package main
import (
"fmt"
"net/http/httputil"
"net/url"
"sync"
)
//定义后端服务结构体
type backend struct {
proxy *httputil.ReverseProxy //代理转发
containerID string //容器ID
}
//服务注册结构体
type ServiceRegistry struct {
mu sync.RWMutex
backends []backend
}
//初始化
func (s *ServiceRegistry) Init() {
s.mu = sync.RWMutex{}
s.backends = []backend{} //默认服务列表为空
}
//向服务列表添加服务,也即是注册服务
func (s *ServiceRegistry) Add(containerID, addr string) {
s.mu.Lock()
defer s.mu.Unlock()
URL, _ := url.Parse(addr)
s.backends = append(s.backends, backend{
//根据后端服务创建代理对象,用于转发请求
proxy: httputil.NewSingleHostReverseProxy(URL),
containerID: containerID,
})
}
//根据容器ID查询注册列表,支持并发
func (s *ServiceRegistry) GetByContainerID(containerID string) (backend, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, b := range s.backends {
if b.containerID == containerID {
return b, true
}
}
return backend{}, false
}
//根据容器ID读取后端服务实例
func (s *ServiceRegistry) GetByIndex(index int) backend {
s.mu.RLock()
defer s.mu.RUnlock()
return s.backends[index]
}
//根据容器ID移除服务
func (s *ServiceRegistry) RemoveByContainerID(containerID string) {
s.mu.Lock()
defer s.mu.Unlock()
var backends []backend
for _, b := range s.backends {
if b.containerID == containerID {
continue
}
backends = append(backends, b)
}
s.backends = backends
}
//清除服务注册列表
func (s *ServiceRegistry) RemoveAll() {
s.mu.Lock()
defer s.mu.Unlock()
s.backends = []backend{}
}
//获取服务实例个数
func (s *ServiceRegistry) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.backends)
}
//打印服务列表
func (s *ServiceRegistry) List() {
s.mu.RLock()
defer s.mu.RUnlock()
for i := range s.backends {
fmt.Println(s.backends[i].containerID)
}
}
源代码地址:
https://github.com/Abdulsametileri/simple-service-discovery
总结:
本文有几个知识点值得了解下:
- 服务发现的基本逻辑和实现方式。
- 使用httputil包来实现请求代理转发,实现反向代理功能。
- 列表的并发读取同步。
- 简单的轮询算法实现方式。