概述
DBLE在启动时,由作为入口的com.actiontech.dble.DbleServerStartup.main()
控制,依次调用com.actiontech.dble.DbleServer
的以下函数,来完成整个启动过程。这与MyCat的行为相仿。
com.actiontech.dble.DbleServer()
com.actiontech.dble.DbleServer.beforeStart()
com.actiontech.dble.DbleServer.startup()
无论是MyCat的MycatServer
,还是DBLE的DbleServer
,XXXServer这个类并没有自己的线程。完成上面的函数之后,主线程离开DbleServer
的相关函数,并完全交出CPU控制权,后续将由持有子线程的其他模块来驱动程序运作。
换句话说,DbleServer
是把各个功能模块封装起来的逻辑盒子,它主要的工作就是初始化和启动DBLE的各种功能模块。
第一阶段:com.actiontech.dble.DbleServer()
DBLE在全局公用类的设计上,继承了MyCat的做法,大量使用单实例设计模式(Singleton)。DbleServer
本身是单实例设计,其中还包含了大量非传递(non-transitivity)成员变量,并以此来实现其他一些类(或者说是功能模块)的单实例设计。
DbleServer()
的主要工作就是对非传递成员变量进行赋值:
- 读取配置文件
private final ServerConfig config;
// ...
this.config = new ServerConfig();
- 创建定时任务的计时线程池
private final ScheduledExecutorService scheduler;
// ...
scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build());
- 设置服务上线标识
private final AtomicBoolean isOnline;
// ...
this.isOnline = new AtomicBoolean(true);
- 创建缓存服务
private final CacheService cacheService;
// ...
cacheService = new CacheService(config.getSystem().isLowerCaseTableNames());
- 创建路由服务
private final RouteService routerService;
// ...
routerService = new RouteService(cacheService);
- 记录启动时间
private final long startupTime;
// ...
this.startupTime = TimeUtil.currentTimeMillis();
- 创建会话XA检查服务
这是MyCat没有,DBLE特有的功能组件。原理上是提供commitSession
和rollbackSession
这两个map,和它们的commit/rollback状态检查,来协助实现XA中的全局事务管理器概念。
虽然没有进行final
修饰,但是它是实例私有(private
),也没有提供setter,而且DbleServer
中没有更改过它的对象引用,所以也可以认为它是单实例模式的设计。
private XASessionCheck xaSessionCheck;
// ...
xaSessionCheck = new XASessionCheck();
- 创建序列号发生器
private final SequenceHandler sequenceHandler;
// ...
sequenceHandler = initSequenceHandler(config.getSystem().getSequnceHandlerType());
与MycatServer
相比,DbleServer
在这个阶段(MycatServer
的实例化过程中),并不实例化DynaClassLoader
(catlet加载器)和SQLInterceptor
(SQL解析器,封装了druid)。这是因为DbleServer
从设计上就移除了这两个成员,是设计上的显著不同。
第二阶段:com.actiontech.dble.DbleServer.beforeStart()
和MyCat一样,beforeStart()
仅仅调用SystemConfig.getHomePath()
,这是一个只读不写的静态方法,所以并不会引起任何变化。
public void beforeStart() {
SystemConfig.getHomePath();
}
另外,com.actiontech.dble.DbleServerStartup.main()
在实例化DbleServer
之前也调用同一方法,用来确认DBLE_HOME。
所以,从功能上来看这个步骤是没有意义的一步。MyCat和DBLE都没有移除它,可能是想保留一个扩展点吧。
第三阶段:com.actiontech.dble.DbleServer.startup()
该阶段主要是创建了使外部能够访问和利用DBLE的协作类:业务前端SocketAcceptor server
、管理前端SocketAcceptor manager
和处理器组NIOProcessor processors[]
。
但是,从最终效果来说,其实这些类的实例化与初始化工作前移到DbleServer
的实例化阶段(启动的第一阶段)也没有问题(reload操作针对的是ServerConfig
)。将这些类的初始化和启动混放在本阶段,各阶段间的逻辑区分不太清晰,应该是来自MyCat的的历史包袱。
- 创建备份时用的业务阻塞锁
这是MyCat所不具备的特性,这个特性对于集群备份非常必要。
private AtomicBoolean backupLocked;
// ...
backupLocked = new AtomicBoolean(false);
- 创建业务端口(TCP 8066)的聆听器
ServerConnectionFactory sf = new ServerConnectionFactory();
SocketAcceptor server = null;
// ...
server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", system.getBindIp(),
system.getServerPort(), system.getServerBacklog(), sf, reactorPool);
// ...
server.start();
- 创建管理端口(TCP 9066)的聆听器
ManagerConnectionFactory mf = new ManagerConnectionFactory();
SocketAcceptor manager = null;
// ...
manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Manager", system.getBindIp(),
system.getManagerPort(), 100, mf, reactorPool);
// ...
manager.start();
- 分配内存缓冲池
int bufferPoolPageSize = system.getBufferPoolPageSize();
// total page number
short bufferPoolPageNumber = system.getBufferPoolPageNumber();
//minimum allocation unit
short bufferPoolChunkSize = system.getBufferPoolChunkSize();
totalNetWorkBufferSize = bufferPoolPageSize * bufferPoolPageNumber;
if (totalNetWorkBufferSize > Platform.getMaxDirectMemory()) {
LOGGER.error("Direct BufferPool size lager than MaxDirectMemory");
throw new IOException("Direct BufferPool size lager than MaxDirectMemory");
}
bufferPool = new DirectByteBufferPool(bufferPoolPageSize, bufferPoolChunkSize, bufferPoolPageNumber);
- 分配用于跨节点结果整合/排序/分组/分页用的堆外内存
if (system.getUseOffHeapForMerge() == 1) {
try {
serverMemory = new SeverMemory(system, totalNetWorkBufferSize);
} catch (NoSuchFieldException e) {
LOGGER.error("NoSuchFieldException", e);
} catch (IllegalAccessException e) {
LOGGER.error("Error", e);
}
}
- 创建业务处理池
int threadPoolSize = system.getProcessorExecutor();
// ...
businessExecutor = ExecutorUtil.createFixed("BusinessExecutor", threadPoolSize);
- 创建复杂查询处理池
int threadPoolSize = system.getProcessorExecutor();
// ...
complexQueryExecutor = ExecutorUtil.createCached("complexQueryExecutor", threadPoolSize);
- 创建定时任务处理池
timerExecutor = ExecutorUtil.createFixed("Timer", 1);
- 将内存缓冲池和业务处理池组合成一个抽象对象,处理器
int processorCount = system.getProcessors();
processors = new NIOProcessor[processorCount];
// ...
for (int i = 0; i < processors.length; i++) {
processors[i] = new NIOProcessor("Processor" + i, bufferPool, businessExecutor);
}
- 创建交易语句记录器
if (config.getSystem().getRecordTxn() == 1) {
txnLogProcessor = new TxnLogProcessor(bufferPool);
txnLogProcessor.setName("TxnLogProcessor");
txnLogProcessor.start();
}
- 创建互斥元数据管理器
tmManager = new ProxyMetaManager();
if (!this.getConfig().isDataHostWithoutWR()) {
//init tmManager
try {
tmManager.init(this.getConfig());
} catch (Exception e) {
throw new IOException(e);
}
}
- 进行XA的crash recovery
performXARecoveryLog();
- 注册定时任务
long dataNodeIdleCheckPeriod = system.getDataNodeIdleCheckPeriod();
scheduler.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(processorCheck(), 0L, system.getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleAtFixedRate(dataNodeConHeartBeatCheck(dataNodeIdleCheckPeriod), 0L, dataNodeIdleCheckPeriod, TimeUnit.MILLISECONDS);
//dataHost heartBeat will be influence by dataHostWithoutWR
scheduler.scheduleAtFixedRate(dataNodeHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleAtFixedRate(dataSourceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(xaSessionCheck(), 0L, system.getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(xaLogClean(), 0L, system.getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
scheduler.scheduleWithFixedDelay(resultSetMapClear(), 0L, system.getClearBigSqLResultSetMapMs(), TimeUnit.MILLISECONDS);
if (system.getUseSqlStat() == 1) {
//sql record detail timing clean
scheduler.scheduleWithFixedDelay(recycleSqlStat(), 0L, DEFAULT_SQL_STAT_RECYCLE_PERIOD, TimeUnit.MILLISECONDS);
}
if (system.getUseGlobleTableCheck() == 1) { // will be influence by dataHostWithoutWR
scheduler.scheduleWithFixedDelay(globalTableConsistencyCheck(), 0L, system.getGlableTableCheckPeriod(), TimeUnit.MILLISECONDS);
}
- 初始化到数据库的连接
if (!this.getConfig().isDataHostWithoutWR()) {
// init datahost
// connect to Databases using conf settings
Map<String, PhysicalDBPool> dataHosts = this.getConfig().getDataHosts();
LOGGER.info("Initialize dataHost ...");
for (PhysicalDBPool node : dataHosts.values()) {
String index = dnIndexProperties.getProperty(node.getHostName(), "0");
if (!"0".equals(index)) {
LOGGER.info("init datahost: " + node.getHostName() + " to use datasource index:" + index);
}
int activeIndex = node.init(Integer.parseInt(index));
saveDataHostIndex(node.getHostName(), activeIndex);
node.startHeartbeat();
}
}