在 SOFABolt 中提供了三种连接监控机制:重连、自定义连接监控(默认实现是自动断连)与预留连接泄露监控。
通常 RPC 调用过程,是不需要断链与重连的。因为每次 RPC 调用过程,都会校验是否有可用连接,如果没有则新建一个。但有一些场景,是需要断链和保持长连接的:
- 自动断连:比如通过 LVS VIP 或者 F5 建立多个连接的场景,因为网络设备的负载均衡机制,有可能某一些连接固定映射到了某几台后端的 RS 上面,此时需要自动断连,然后重连,靠建连过程的随机性来实现最终负载均衡。注意,开启了自动断连的场景,通常需要配合重连使用。
- 重连:比如客户端发起建连后,由服务端来通过双工通信,发起请求到客户端。此时如果没有重连机制,则无法实现(因为服务端向客户端发起请求时,不会做建连操作)。而且就算是客户端向服务端发起请求,由于建连的过程需要耗时,如果重连机制可以重连这些连接,那么可以省去建立的耗时
以上内容摘自:SOFABolt 用户手册
一、重连机制(只有客户端有,服务端不建连)
- 首先开启重连开关(系统级别或者实例级别)
- RpcClient 初始化时创建重连管理器实例,重连管理器创建的过程会自动启动重连线程,重连线程对重连管理器中的待连接队列进行扫描,调用连接管理器 api 来重建连接
- 在 channelInactive 时将当前连接的 url 放到待重连队列中(后续被重连线程扫描)
使用姿势
// 系统设置操作一定要在 new RpcClient() 之前(因为GlobalSwitch是在 new RpcClient() 完成的)
System.setProperty(Configs.CONN_RECONNECT_SWITCH, "true");
client = new RpcClient();
// 实例级别的设置
client.enableReconnectSwitch();
// 最原始实力级别的做法
client.switches().turnOn(GlobalSwitch.CONN_RECONNECT_SWITCH)
client.init();
源码分析
============================ RpcClient ============================
public void init() {
...
// 开启自动建连开关
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
// 创建重连管理器,启动重连线程
reconnectManager = new ReconnectManager(connectionManager);
connectionEventHandler.setReconnectManager(reconnectManager);
}
}
============================ ReconnectManager ============================
public class ReconnectManager {
class ReconnectTask {
Url url;
}
// 待重连队列
private final LinkedBlockingQueue<ReconnectTask> tasks = new LinkedBlockingQueue<ReconnectTask>();
// 取消重连的队列
protected final List<Url> canceled = new CopyOnWriteArrayList<Url>();
private volatile boolean started;
private int healConnectionInterval = 1000;
private final Thread healConnectionThreads;
private ConnectionManager connectionManager;
public ReconnectManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
// 创建重连任务
this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.started = true;
// 启动重连线程
this.healConnectionThreads.start();
}
// 执行重连操作
private void doReconnectTask(ReconnectTask task) throws InterruptedException, RemotingException {
connectionManager.createConnectionAndHealIfNeed(task.url);
}
// 将重连任务添加到待重连队列中
private void addReconnectTask(ReconnectTask task) {
tasks.add(task);
}
// 将不需要重连的任务添加到取消重连队列中
public void addCancelUrl(Url url) {
canceled.add(url);
}
public void removeCancelUrl(Url url) {
canceled.remove(url);
}
// 将重连任务添加到待重连队列中
public void addReconnectTask(Url url) {
ReconnectTask task = new ReconnectTask();
task.url = url;
tasks.add(task);
}
// 检测 task 是否需要重连
private boolean isValidTask(ReconnectTask task) {
return !canceled.contains(task.url);
}
// 停止重连线程
public void stop() {
// 如果重连线程没启动过,直接返回
if (!this.started) {
return;
}
this.started = false;
// 中断重连线程
healConnectionThreads.interrupt();
// 清空待重连队列和取消重连队列
this.tasks.clear();
this.canceled.clear();
}
// 重连任务
private final class HealConnectionRunner implements Runnable {
private long lastConnectTime = -1;
@Override
public void run() {
while (ReconnectManager.this.started) {
long start = -1;
ReconnectTask task = null;
try {
// 如果重连线程执行的连接操作的时间小于 healConnectionInterval,当前线程睡 healConnectionInterval(防止待重连队列为空,线程空转,CPU消耗严重)
// 如果重连线程执行的连接操作的时间 >= healConnectionInterval,可继续执行
if (this.lastConnectTime < ReconnectManager.this.healConnectionInterval) {
Thread.sleep(ReconnectManager.this.healConnectionInterval);
}
// 从待重连队列获取待重连任务
task = ReconnectManager.this.tasks.take();
start = System.currentTimeMillis();
if (ReconnectManager.this.isValidTask(task)) {
try {
// 如果待重连任务没有被取消,则执行重连任务
ReconnectManager.this.doReconnectTask(task);
} catch (InterruptedException e) {
throw e;
}
}
this.lastConnectTime = System.currentTimeMillis() - start;
} catch (Exception e) {
// 如果失败,将失败任务重新加入待重连队列,之后重试重连操作
retryWhenException(start, task, e);
}
}
}
private void retryWhenException(long start, ReconnectTask task, Exception e) {
if (start != -1) {
this.lastConnectTime = System.currentTimeMillis() - start;
}
// 将失败任务重新加入待重连队列
ReconnectManager.this.addReconnectTask(task);
}
}
}
============================ DefaultConnectionManager ============================
/**
* If no task cached, create one and initialize the connections.
* If task cached, check whether the number of connections adequate, if not then heal it.
*/
public void createConnectionAndHealIfNeed(Url url) {
// 这里使用了建连操作方法,
// 如果 ConnectionPool 不存在,则创建 ConnectionPool,然后创建指定数量的 Connection;
// 如果 ConnectionPool 已经存在,那么这里会直接获取 ConnectionPool 并返回,此时就有可能需要重连操作
ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),
new ConnectionPoolCall(url));
if (null != pool) {
healIfNeed(pool, url);
}
}
// execute heal connection tasks if the actual number of connections in pool is less than expected
private void healIfNeed(ConnectionPool pool, Url url) {
String poolKey = url.getUniqueKey();
// only when async creating connections done 同步创建的连接在创建时一定是成功的,否则抛出异常;一旦连接失效,不再重连?
// and the actual size of connections less than expected, the healing task can be run.
if (pool.isAsyncCreationDone() && pool.size() < url.getConnNum()) {
FutureTask<Integer> task = this.healTasks.get(poolKey);
// 仅仅用于防并发,因为在 task 执行一次之后,就会从 healTasks 移除
if (null == task) {
task = new FutureTask<Integer>(new HealConnectionCall(url, pool));
task = this.healTasks.putIfAbsent(poolKey, task);
if (null == task) {
task = this.healTasks.get(poolKey);
task.run();
}
}
int numAfterHeal = task.get();
// heal task is one-off 一次性的, remove from cache directly after run
this.healTasks.remove(poolKey);
}
}
private class HealConnectionCall implements Callable<Integer> {
...
public Integer call() throws Exception {
// 创建连接(与建连一样,不再分析)
doCreate(this.url, this.pool, this.getClass().getSimpleName(), 0);
// 返回连接池中的连接数量
return this.pool.size();
}
}
============================ ConnectionEventHandler ============================
public void channelInactive(ChannelHandlerContext ctx) {
...
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// 是否开启了重连开关
if (this.globalSwitch != null && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
// 将当前连接的 url 加入到待连接队列中
reconnectManager.addReconnectTask(conn.getUrl());
}
}
...
}
}
疑问:为什么同步创建连接的不需要重新建连?
二、自定义连接监控机制(只有客户端有)
- 首先开启监控开关(系统级别或者实例级别)
- RpcClient 初始化时选择监控策略 + 创建监控器实例,之后启动监控线程,监控线程对所有的连接池进行扫描,调用监控策略 api 来重建连接
特别注意:由于 oneway 模式没有 InvokeFuture,并且默认情况下使用的监控器是断连器,所以会被断连器断掉连接,每次请求都要重新建连,所以在打开 CONN_MONITOR_SWITCH 的情况下,不建议使用 oneway 模式。
使用姿势
// 系统设置操作一定要在 new RpcClient() 之前(因为GlobalSwitch是在 new RpcClient() 完成的)
System.setProperty(Configs.CONN_MONITOR_SWITCH, "true");
client = new RpcClient();
// 实例级别的设置
client.enableConnectionMonitorSwitch();
// 最原始实例级别的做法
client.switches().turnOn(GlobalSwitch.CONN_MONITOR_SWITCH);
client.init();
源码分析
============================ 监控器(内部调用监控策略执行相应操作) ============================
public class DefaultConnectionMonitor {
private DefaultConnectionManager connectionManager;
/** Monitor strategy */
private ConnectionMonitorStrategy strategy;
private ScheduledThreadPoolExecutor executor;
public void start() {
/** initial delay to execute schedule task, unit: ms,默认 10s */
long initialDelay = ConfigManager.conn_monitor_initial_delay();
/** period of schedule task, unit: ms,默认 3min */
long period = ConfigManager.conn_monitor_period();
// 创建 定时线程池
this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy());
// 创建 MonitorTask
MonitorTask monitorTask = new MonitorTask();
// 线程启动后 10s 开始第一次执行,以后每隔 3min 钟执行一次
this.executor.scheduleAtFixedRate(monitorTask, initialDelay, period, TimeUnit.MILLISECONDS);
}
/** cancel task and shutdown executor */
public void destroy() {
executor.purge();
executor.shutdown();
}
private class MonitorTask implements Runnable {
public void run() {
if (strategy != null) {
// 获取所有的连接池
Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager.getConnPools();
// 使用 ConnectionMonitorStrategy 处理所有的连接池
strategy.monitor(connPools);
}
}
}
}
============================ ConnectionMonitorStrategy ============================
public interface ConnectionMonitorStrategy {
// 将 Connection 进行分类
Map<String, List<Connection>> filter(List<Connection> connections);
/**
* Add a set of connections to monitor.
* 注意:
* The previous connections in monitor of this protocol,
* will be dropped by monitor automatically.
*/
void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools);
}
============================ ScheduledDisconnectStrategy ============================
public class ScheduledDisconnectStrategy implements ConnectionMonitorStrategy {
private Map<String, Connection> freshSelectConnections = new ConcurrentHashMap<String, Connection>();
@Override
public Map<String, List<Connection>> filter(List<Connection> connections) {
List<Connection> serviceOnConnections = new ArrayList<Connection>();
List<Connection> serviceOffConnections = new ArrayList<Connection>();
Map<String, List<Connection>> filteredConnections = new ConcurrentHashMap<String, List<Connection>>();
for (Connection connection : connections) {
// 遍历 connections,获取连接的状态附属属性,
// 如果为 null,加入 serviceOnConnections
// 如果不为 null && 没有 InvokeFuture && 不在 freshSelectConnections 中,加入 serviceOffConnections
String serviceStatus = (String) connection.getAttribute(Configs.CONN_SERVICE_STATUS);
if (serviceStatus != null) {
if (connection.isInvokeFutureMapFinish()
&& !freshSelectConnections.containsValue(connection)) {
serviceOffConnections.add(connection);
}
} else {
serviceOnConnections.add(connection);
}
}
filteredConnections.put(Configs.CONN_SERVICE_STATUS_ON, serviceOnConnections);
filteredConnections.put(Configs.CONN_SERVICE_STATUS_OFF, serviceOffConnections);
return filteredConnections;
}
@Override
public void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools) {
Iterator<Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>>> iter = connPools.entrySet().iterator();
// 遍历所有的连接池映射,类似于 {poolKey: ConnectionPool}
while (iter.hasNext()) {
Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry = iter.next();
String poolKey = entry.getKey();
// 获取 ConnectionPool
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
// 获取 ConnectionPool 中的所有 Connection,并按照 filter 划分为两类(onStatus 和 offStatus)
List<Connection> connections = pool.getAll();
Map<String, List<Connection>> filteredConnectons = this.filter(connections);
List<Connection> serviceOnConnections = filteredConnectons.get(Configs.CONN_SERVICE_STATUS_ON);
List<Connection> serviceOffConnections = filteredConnectons.get(Configs.CONN_SERVICE_STATUS_OFF);
// onStatus 状态的连接数大于 CONNECTION_THRESHOLD(默认为3)
if (serviceOnConnections.size() > CONNECTION_THRESHOLD) {
// 从 serviceOnConnections 随机选取连接,加上 offStatus 的附加属性(用于 filter 操作),
// 之后将这个选取出来的连接塞入 freshSelectConnections,并返回之前的同样 poolKey 的连接 lastSelectConnect
// 尝试将 lastSelectConnect 塞入 serviceOffConnections(后续进行断链操作)
Connection freshSelectConnect = serviceOnConnections.get(random.nextInt(serviceOnConnections.size()));
freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS, Configs.CONN_SERVICE_STATUS_OFF);
Connection lastSelectConnect = freshSelectConnections.put(poolKey, freshSelectConnect);
closeFreshSelectConnections(lastSelectConnect, serviceOffConnections);
} else {
Connection lastSelectConnect = freshSelectConnections.remove(poolKey);
closeFreshSelectConnections(lastSelectConnect, serviceOffConnections);
}
// 关闭 serviceOffConnections 中的连接
for (Connection offConn : serviceOffConnections) {
if (offConn.isFine()) {
offConn.close();
}
}
}
}
private void closeFreshSelectConnections(Connection lastSelectConnect, List<Connection> serviceOffConnections) {
// 如果 lastSelectConnect 中没有 InvokeFuture 了,则塞入 serviceOffConnections;
// 否则,等待 5s,在重试一次如上操作
if (lastSelectConnect.isInvokeFutureMapFinish()) {
serviceOffConnections.add(lastSelectConnect);
} else {
Thread.sleep(RETRY_DETECT_PERIOD);
if (lastSelectConnect.isInvokeFutureMapFinish()) {
serviceOffConnections.add(lastSelectConnect);
}
}
}
}
============================ RpcClient ============================
public void init() {
...
// 开启自定义监控开关
if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
// 选择监控策略 + 创建监控器:没有自定义 ConnectionMonitorStrategy,则使用默认的 ScheduledDisconnectStrategy
if (monitorStrategy == null) {
ScheduledDisconnectStrategy strategy = new ScheduledDisconnectStrategy();
connectionMonitor = new DefaultConnectionMonitor(strategy, this.connectionManager);
} else {
connectionMonitor = new DefaultConnectionMonitor(monitorStrategy, this.connectionManager);
}
// 启动监控线程
connectionMonitor.start();
}
...
}
三、预留连接泄露监控机制
SOFABolt 提供了一个后台扫描接口 Scannable,所有需要做资源清理等后台任务的接口都可以继承该接口来进行进行资源清理工作。
注意:该功能自动开启,不可关闭。
============================ Scannable ============================
public interface Scannable {
void scan();
}
============================ DefaultConnectionManager ============================
// interface ConnectionManager extends Scannable
public void scan() {
if (null != this.connTasks && !this.connTasks.isEmpty()) {
// 获取连接任务 map 中的key
Iterator<String> iter = this.connTasks.keySet().iterator();
while (iter.hasNext()) {
String poolKey = iter.next();
// 根据 key 获取 ConnectionPool
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
if (null != pool) {
// 执行 ConnectionPool 的扫描操作
pool.scan();
if (pool.isEmpty()) {
// 如果 ConnectionPool 空了 && 在 DEFAULT_EXPIRE_TIME(默认为10min) 时间内没有对 ConnectionPool 进行操作了,
// 那么删除该 ConnectionPool 的连接任务
if ((System.currentTimeMillis() - pool.getLastAccessTimestamp()) > DEFAULT_EXPIRE_TIME) {
iter.remove();
}
}
}
}
}
}
============================ ConnectionPool ============================
public void scan() {
if (null != this.conns && !this.conns.isEmpty()) {
for (Connection conn : conns) {
// 遍历连接列表,如果连接不可用了,关闭连接
if (!conn.isFine()) {
// 关闭 netty channel
conn.close();
// 从 conns 连接列表删除 conn,减少 conn 的引用数量,如果 conn 不在有任何引用,则关闭 netty channel, 存在重复关闭的嫌疑?
this.removeAndTryClose(conn);
}
}
}
}
============================ RpcTaskScanner ============================
public class RpcTaskScanner {
// 创建单个后台线程
private ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("RpcTaskScannerThread", true));
// 待扫描任务列表
private List<Scannable> scanList = new LinkedList<Scannable>();
public void start() {
// 线程启动10s后开始执行,每个10s扫描一次
scheduledService.scheduleWithFixedDelay(new Runnable() {
public void run() {
for (Scannable scanned : scanList) {
scanned.scan();
}
}
}, 10000, 10000, TimeUnit.MILLISECONDS);
}
}
============================ RpcClient ============================
private RpcTaskScanner taskScanner = new RpcTaskScanner();
public void init() {
...
// 连接泄露监控任务添加
this.taskScanner.add(this.connectionManager);
// 开启连接泄露监控线程
this.taskScanner.start();
...
}
疑问
- 重复关闭 netty channel 是否会出错?不会,所以可以重复 close。
- 只有客户端有连接泄露检测操作,服务端没有?如果服务端开启了连接管理器功能,那么服务端也需要开启,否则服务端连接管理器中的 List<Connection> 中的 Connection 无法删除?