SOFABolt 的停机机制分为服务端 RpcServer 停机 和客户端 RpcClient 停机。
- RpcServer 停机步骤
- 关闭 netty 服务端 boss channel
- 关闭 bossGroup,workerGroup 不能关掉(因为 workerGroup 是多个 RpcServer 共享的)
- 如果服务端连接管理器是开启的,遍历所有的连接池 ConnectionPool,关闭池内的所有 Connection(进而关闭 Connection 内部的 netty 服务端 worker channel)
- RpcClient 停机步骤
- 遍历所有的连接池 ConnectionPool,关闭池内的所有 Connection(进而关闭 Connection 内部的 netty 服务端 worker channel)
- 关闭 Scannable 任务扫描线程
- 关闭重连线程 + 清空重连任务
- 取消连接监听器内的任务 + 销毁连接监听器的线程池
服务端停机 RpcServer
======================================= AbstractRemotingServer =======================================
public boolean stop() {
if (started.compareAndSet(true, false)) {
return this.doStop();
} else {
throw new IllegalStateException("ERROR: The server has already stopped!");
}
}
======================================= RpcServer =======================================
protected boolean doStop() {
// 1. 关闭 netty 服务端 boss channel
if (null != this.channelFuture) {
this.channelFuture.channel().close();
}
// 2. 关闭 bossGroup,workerGroup 不能关掉(因为是共享的)
if (this.switches().isOn(GlobalSwitch.SERVER_SYNC_STOP)) {
this.bossGroup.shutdownGracefully().awaitUninterruptibly();
} else {
this.bossGroup.shutdownGracefully();
}
// 3. 如果服务端连接管理器是开启的,遍历所有的连接池 ConnectionPool,关闭连接(内部关闭 netty 服务端 worker channel)
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH) && null != this.connectionManager) {
this.connectionManager.removeAll();
}
return true;
}
======================================= DefaultConnectionManager =======================================
// connection pool initialize tasks
protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;
public void removeAll() {
if (null == this.connTasks || this.connTasks.isEmpty()) {
return;
}
if (null != this.connTasks && !this.connTasks.isEmpty()) {
Iterator<String> iter = this.connTasks.keySet().iterator();
// 遍历所有的连接池 ConnectionPool,关闭所有 Connection(包括 Connection 内的 netty worker channel)
while (iter.hasNext()) {
String poolKey = iter.next();
this.removeTask(poolKey);
iter.remove();
}
}
}
private void removeTask(String poolKey) {
RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.remove(poolKey);
if (null != task) {
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(task, logger);
if (null != pool) {
pool.removeAllAndTryClose();
}
}
}
======================================= ConnectionPool =======================================
// remove all connections
public void removeAllAndTryClose() {
for (Connection conn : this.conns) {
removeAndTryClose(conn);
}
this.conns.clear();
}
public void removeAndTryClose(Connection connection) {
if (null == connection) {
return;
}
boolean res = this.conns.remove(connection);
if (res) {
connection.decreaseRef();
}
if (connection.noRef()) {
connection.close();
}
}
======================================= Connection =======================================
// Close the connection.
public void close() {
if (closed.compareAndSet(false, true)) {
if (this.getChannel() != null) {
this.getChannel().close();
}
}
}
客户端停机 RpcClient
======================================= RpcClient =======================================
public void shutdown() {
// 1. 遍历所有的连接池 ConnectionPool,关闭池内的所有 Connection(进而关闭 Connection 内部的 netty 服务端 worker channel)
// 这一步与服务端开启了连接管理一样,不再赘述
this.connectionManager.removeAll();
// 2. 关闭 Scannable 任务扫描线程
this.taskScanner.shutdown();
// 3. 关闭重连线程 + 清空重连任务
if (reconnectManager != null) {
reconnectManager.stop();
}
// 4. 销毁连接监听器的线程池
if (connectionMonitor != null) {
connectionMonitor.destroy();
}
}
======================================= RpcTaskScanner =======================================
private ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RpcTaskScannerThread", true));
// Shutdown the scheduled service.
public void shutdown() {
scheduledService.shutdown();
}
======================================= ReconnectManager =======================================
private final Thread healConnectionThreads; // 重连线程
// stop reconnect thread
public void stop() {
if (!this.started) {
return;
}
this.started = false;
healConnectionThreads.interrupt();
this.tasks.clear();
this.canceled.clear();
}
======================================= DefaultConnectionMonitor =======================================
private ScheduledThreadPoolExecutor executor;
// cancel task and shutdown executor
public void destroy() {
// Tries to remove from the work queue all Future tasks that have been cancelled
executor.purge();
executor.shutdown();
}