Divide插件功能介绍
Divide插件是进行http正向代理的插件,所有http类型的请求,都由该插件进行负载均衡调用。
-
负载均衡:
- 随机(带权重):性能高,但均衡差一些
- 轮询(带权重):性能较随机差一些,但均衡性好
- 一致性Hash:同一个客户端的IP请求,始终会被同一个Server处理
服务探活
上篇已经对负载均衡轮询策略进行了分析。这篇分析另外两个策略是随机和一致性Hash。
负载均衡
随机
随机算法比较简单,就是随机选取其中一个Server地址。
public class RandomLoadBalance extends AbstractLoadBalance {
private static final Random RANDOM = new Random();
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 计算所有上游服务加起来权重值
int totalWeight = calculateTotalWeight(upstreamList);
// 是否所有上游服务的权重是一样
boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
// 如果它们权重不一样
if (totalWeight > 0 && !sameWeight) {
// 根据权重值,去随机选择一个上游服务
return random(totalWeight, upstreamList);
}
// 如果权重一样的,那么就随机选择一个上游服务
return random(upstreamList);
}
// 所有上游服务的权重是否一样
private boolean isAllUpStreamSameWeight(final List<DivideUpstream> upstreamList) {
boolean sameWeight = true;
int length = upstreamList.size();
// 遍历所有上游服务,当前节点和上一个节点的权重进行比较
for (int i = 0; i < length; i++) {
int weight = getWeight(upstreamList.get(i));
if (i > 0 && weight != getWeight(upstreamList.get(i - 1))) {
// Calculate whether the weight of ownership is the same
sameWeight = false;
break;
}
}
return sameWeight;
}
// 所有上游服务的权重值进行求和
private int calculateTotalWeight(final List<DivideUpstream> upstreamList) {
// ...
}
private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
// 随机选择一个权重值,totalWeight是所有上游服务的权重之和。
int offset = RANDOM.nextInt(totalWeight);
// 遍历所有上游服务
for (DivideUpstream divideUpstream : upstreamList) {
// 随机的权重值减去该上游服务的权重值
offset -= getWeight(divideUpstream);
// 如果小于0,那么就选中该上游服务
if (offset < 0) {
return divideUpstream;
}
}
return upstreamList.get(0);
}
}
一致性Hash
能保证同一个来源的请求总是在同一个服务器上处理,实现会话粘滞。
一致性Hash算法在集群系统中是很常见的算法,
public class HashLoadBalance extends AbstractLoadBalance {
// 虚拟节点数量
private static final int VIRTUAL_NODE_NUM = 5;
@Override
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 创建跳表Map结构容器
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
// 遍历上游服务列表
for (DivideUpstream address : upstreamList) {
// 在一致性HASH环上创建虚拟节点
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
// 根据URL和虚拟节点index创建hash值
long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
// hash做为key,URL做为value 保存到容器中
treeMap.put(addressHash, address);
}
}
// 获取客户端的IP的hash值
long hash = hash(String.valueOf(ip));
// 根据hash值(客户端IP的)做为key查找最近的上游服务的URL
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
if (!lastRing.isEmpty()) {
// 如果不为空,获取URL
return lastRing.get(lastRing.firstKey());
}
// 如果没找到,就取第一个URL地址
return treeMap.firstEntry().getValue();
}
// hash函数
private static long hash(final String key) {
// ...
}
}
服务探活
服务探活也是网关常用的功能之一,如果某一台上游服务不可用,我们就把这台服务踢出去,每次请求过来就不会去映射到这个地址。保证系统高可用。
看一下Soul网关是如何实现,代码如下:
public final class UpstreamCacheManager {
private UpstreamCacheManager() {
// 是否开启单独的线程去服务探活
boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));
if (check) {
// 创建定时任务线程池,线程数为1。
new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false))
.scheduleWithFixedDelay(this::scheduled,
30,
// 默认是每30s去探测一次
Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS);
}
}
// 获取可用的上游服务列表
public List<DivideUpstream> findUpstreamListBySelectorId(final String selectorId) {
return UPSTREAM_MAP_TEMP.get(selectorId);
}
// 服务探活任务
private void scheduled() {
if (UPSTREAM_MAP.size() > 0) {
// 遍历所有上游服务列表
UPSTREAM_MAP.forEach((k, v) -> {
// 获取到可用的上游服务列表
List<DivideUpstream> result = check(v);
if (result.size() > 0) {
// 有可用的 put
UPSTREAM_MAP_TEMP.put(k, result);
} else {
// 不可用 remove
UPSTREAM_MAP_TEMP.remove(k);
}
});
}
}
private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {
List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size());
// 遍历上游服务列表
for (DivideUpstream divideUpstream : upstreamList) {
// 服务是否存活
final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());
if (pass) {
// 如果服务上次的状态是不可用的,则更新为可用的状态,检测时间更新
if (!divideUpstream.isStatus()) {
divideUpstream.setTimestamp(System.currentTimeMillis());
divideUpstream.setStatus(true);
}
// 添加到列表
resultList.add(divideUpstream);
} else {
// 服务不可用,更新状态
divideUpstream.setStatus(false);
}
}
return resultList;
}
}
如何判断服务是否可活呢
public class UpstreamCheckUtils {
public static boolean checkUrl(final String url) {
if (StringUtils.isBlank(url)) {
return false;
}
// 如果是http或https协议的url
if (checkIP(url)) {
String[] hostPort;
if (url.startsWith(HTTP)) {
final String[] http = StringUtils.split(url, "\\/\\/");
hostPort = StringUtils.split(http[1], Constants.COLONS);
} else {
hostPort = StringUtils.split(url, Constants.COLONS);
}
// 创建socket connection
return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1]));
} else {
// 这个URL是否可到达
return isHostReachable(url);
}
}
private static boolean isHostConnector(final String host, final int port) {
// 创建Socket
try (Socket socket = new Socket()) {
// connect
socket.connect(new InetSocketAddress(host, port));
} catch (IOException e) {
return false;
}
return true;
}
private static boolean isHostReachable(final String host) {
try {
return InetAddress.getByName(host).isReachable(1000);
} catch (IOException ignored) {
}
return false;
}
}
总结
学习了基于一致性Hash和加权随机的负载均衡的算法。
学习了服务探活功能。