Cassandra 的入口
CassandraDaemon 作为Cassandra的入口,做了以下几件事:
- load configuration
- 注册 MBean service
- load schemas
- scrub data directories
- init keyspaces
- start StorageService
- start gossip service
- start native/thrift server
main()
public static void main(String[] args)
{
instance.activate();
}
activate()
public void activate()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbs.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), new ObjectName(MBEAN_NAME));
setup();
start();
}
setup()
protected void setup()
{
// check the system keyspace to keep user from shooting self in foot by changing partitioner, cluster name, etc.
// we do a one-off scrub of the system keyspace first; we can't load the list of the rest of the keyspaces,
// until system keyspace is opened.
for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(Keyspace.SYSTEM_KS).values())
ColumnFamilyStore.scrubDataDirectories(cfm);
SystemKeyspace.checkHealth();
// load keyspace descriptions.
DatabaseDescriptor.loadSchemas();
// clean up compaction leftovers
Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
// clean up debris in the rest of the keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
{
//...
}
Keyspace.setInitialized();
// initialize keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
{
//...
}
// replay the log if necessary
CommitLog.instance.recover();
// start server internals
StorageService.instance.registerDaemon(this);
StorageService.instance.initServer();
// Metrics
String metricsReporterConfigFile = System.getProperty("cassandra.metricsReporterConfigFile");
// schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
// due to scheduling errors or race conditions
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);
// Thrift
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
// Native transport
nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
completeSetup();
}
start()
public void start()
{
nativeServer.start();
thriftServer.start();
}
初始化 DatabaseDescriptor
通过 static block 初始化
static
{
applyConfig(loadConfig());
}
首先是读入config, 用yaml的library读取config (org.yaml.snakeyaml).
@VisibleForTesting
public static Config loadConfig() throws ConfigurationException
{
String loaderClass = System.getProperty("cassandra.config.loader");
ConfigurationLoader loader = loaderClass == null
? new YamlConfigurationLoader()
: FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
return loader.loadConfig();
}
再apply config, 主要是validate config, 用config 初始化某些service比如snitch, comparator, memoryAllocator.