本文主要是针对solr启动过程的理解。⚠️使用solr-6.3.0版本
启动入口
web.xml
solr在中嵌入了jetty作为web容器,所以solr归根结底是一个web服务,所以从web.xml入手查看:
<?xml version="1.0" encoding="UTF-8"?>
<!-- Any path (name) registered in solrconfig.xml will be sent to that filter -->
<filter>
<filter-name>SolrRequestFilter</filter-name>
<filter-class>org.apache.solr.servlet.SolrDispatchFilter</filter-class>
<init-param>
<param-name>excludePatterns</param-name>
<param-value>/css/.+,/js/.+,/img/.+,/tpl/.+</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>SolrRequestFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
从web.xml
可以看出solr的所有请求都会经过SolrRequestFilter
这个filter,所以猜测solr的入口在这里,打开一看,果不其然,入口就是org.apache.solr.servlet.SolrDispatchFilter
这个类
SolrDispatchFilter
protected volatile CoreContainer cores;
@Override
public void init(FilterConfig config) throws ServletException
{
...
String solrHome = (String) config.getServletContext().getAttribute(SOLRHOME_ATTRIBUTE);
this.cores = createCoreContainer(solrHome == null ? SolrResourceLoader.locateSolrHome() : Paths.get(solrHome),
extraProperties);
....
}
protected CoreContainer createCoreContainer(Path solrHome, Properties extraProperties) {
NodeConfig nodeConfig = loadNodeConfig(solrHome, extraProperties);
cores = new CoreContainer(nodeConfig, extraProperties, true);
cores.load();
return cores;
}
从createCoreContainer
方法可以看出,init的过程基本分为两个部分:
- 加载nodeconfig
- 加载cores
⚠️实际上,除了old UI
,其他所有的请求都通过这个filter进行处理并返回结果。这个可以在做查询请求代码解析的时候做出说明。
整体过程如图所示,
加载nodeconfig
public static NodeConfig loadNodeConfig(Path solrHome, Properties nodeProperties) {
SolrResourceLoader loader = new SolrResourceLoader(solrHome, null, nodeProperties);
String zkHost = System.getProperty("zkHost");
if (!StringUtils.isEmpty(zkHost)) {
try (SolrZkClient zkClient = new SolrZkClient(zkHost, 30000)) {
if (zkClient.exists("/solr.xml", true)) {
log.info("solr.xml found in ZooKeeper. Loading...");
byte[] data = zkClient.getData("/solr.xml", null, null, true);
return SolrXmlConfig.fromInputStream(loader, new ByteArrayInputStream(data));
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error occurred while loading solr.xml from zookeeper", e);
}
log.info("Loading solr.xml from SolrHome (not found in ZooKeeper)");
}
return SolrXmlConfig.fromSolrHome(loader, loader.getInstancePath());
}
public final static String SOLR_XML_FILE = "solr.xml";
public static NodeConfig fromSolrHome(SolrResourceLoader loader, Path solrHome) {
return fromFile(loader, solrHome.resolve(SOLR_XML_FILE));
}
由此可见,loadNodeConfig只是对于solr.xml
的加载,必须保证指定的solr.home
下存在solr.xml
文件
加载cores
//CoreContainer
/**
* Load the cores defined for this CoreContainer
*/
public void load() {
log.debug("Loading cores into CoreContainer [instanceDir={}]", loader.getInstancePath());
// add the sharedLib to the shared resource loader before initializing cfg based plugins
String libDir = cfg.getSharedLibDirectory();
if (libDir != null) {
Path libPath = loader.getInstancePath().resolve(libDir);
try {
loader.addToClassLoader(SolrResourceLoader.getURLs(libPath));
loader.reloadLuceneSPI();
} catch (IOException e) {
if (!libDir.equals("lib")) { // Don't complain if default "lib" dir does not exist
log.warn("Couldn't add files from {} to classpath: {}", libPath, e.getMessage());
}
}
}
shardHandlerFactory = ShardHandlerFactory.newInstance(cfg.getShardHandlerFactoryPluginInfo(), loader); //初始化shardHandlerFactory
updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig());//初始化updateShardHandler
solrCores.allocateLazyCores(cfg.getTransientCacheSize(), loader);
logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader);
hostName = cfg.getNodeName();
zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig());
if(isZooKeeperAware()) pkiAuthenticationPlugin = new PKIAuthenticationPlugin(this, zkSys.getZkController().getNodeName());
MDCLoggingContext.setNode(this);
//安全认证插件初始化
ZkStateReader.ConfigData securityConfig = isZooKeeperAware() ? getZkController().getZkStateReader().getSecurityProps(false) : new ZkStateReader.ConfigData(EMPTY_MAP, -1);
initializeAuthorizationPlugin((Map<String, Object>) securityConfig.data.get("authorization"));
initializeAuthenticationPlugin((Map<String, Object>) securityConfig.data.get("authentication"));
//backupRepoFactory初始化
this.backupRepoFactory = new BackupRepositoryFactory(cfg.getBackupRepositoryPlugins());
//admin系列的handler初始化
containerHandlers.put(ZK_PATH, new ZookeeperInfoHandler(this));
securityConfHandler = new SecurityConfHandler(this);
collectionsHandler = createHandler(cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
containerHandlers.put(COLLECTIONS_HANDLER_PATH, collectionsHandler);
infoHandler = createHandler(cfg.getInfoHandlerClass(), InfoHandler.class);
containerHandlers.put(INFO_HANDLER_PATH, infoHandler);
coreAdminHandler = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler);
configSetsHandler = createHandler(cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
containerHandlers.put(CONFIGSETS_HANDLER_PATH, configSetsHandler);
containerHandlers.put(AUTHZ_PATH, securityConfHandler);
containerHandlers.put(AUTHC_PATH, securityConfHandler);
if(pkiAuthenticationPlugin != null)
containerHandlers.put(PKIAuthenticationPlugin.PATH, pkiAuthenticationPlugin.getRequestHandler());
coreConfigService = ConfigSetService.createConfigSetService(cfg, loader, zkSys.zkController);//coreConfigService 管理zk上的config文件
containerProperties.putAll(cfg.getSolrProperties());
//开始加载cores
// setup executor to load cores in parallel
ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(
cfg.getCoreLoadThreadCount(isZooKeeperAware()),
new DefaultSolrThreadFactory("coreLoadExecutor") );
final List<Future<SolrCore>> futures = new ArrayList<>();
try {
//从solr home发现cores
List<CoreDescriptor> cds = coresLocator.discover(this); //CorePropertiesLocator 负责遍历solr_home,发现core.properties
if (isZooKeeperAware()) {
//sort the cores if it is in SolrCloud. In standalone node the order does not matter
CoreSorter coreComparator = new CoreSorter().init(this);
cds = new ArrayList<>(cds);//make a copy
Collections.sort(cds, coreComparator::compare);
}
checkForDuplicateCoreNames(cds);
//加载cores
for (final CoreDescriptor cd : cds) {
if (cd.isTransient() || !cd.isLoadOnStartup()) {
solrCores.putDynamicDescriptor(cd.getName(), cd);
} else if (asyncSolrCoreLoad) {
solrCores.markCoreAsLoading(cd);
}
if (cd.isLoadOnStartup()) {
futures.add(coreLoadExecutor.submit(() -> {
SolrCore core;
try {
if (zkSys.getZkController() != null) {
zkSys.getZkController().throwErrorIfReplicaReplaced(cd);
}
core = create(cd, false, false); //根据core描述文件,关联zk上的配置文件,状态为down
} finally {
if (asyncSolrCoreLoad) {
solrCores.markCoreAsNotLoading(cd);
}
}
try {
zkSys.registerInZk(core, true, false);//真正加载core,完成数据的恢复(leader数据迁移到副本),后台执行
} catch (RuntimeException e) {
SolrException.log(log, "Error registering SolrCore", e);
}
return core;
}));
}
}
//结束加载cores
// Start the background thread
backgroundCloser = new CloserThread(this, solrCores, cfg);
backgroundCloser.start();
} finally {
if (asyncSolrCoreLoad && futures != null) {
coreContainerWorkExecutor.submit((Runnable) () -> {
try {
for (Future<SolrCore> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Error waiting for SolrCore to be created", e);
}
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);//等待初始化完毕
}
});
} else {
ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
}
}
if (isZooKeeperAware()) {
zkSys.getZkController().checkOverseerDesignate();
}
}
如上代码中汉语注释所示,load过程主要完成以下工作:
- 初始化管理
shard
的shardHandlerFactory
,updateShardHandler
- 初始化管理
admin
的containerHandlers
- 遍历
solr_home
获取core.properties
,获取所有core的元数据 - 并发加载cores,
1.初始化core:
根据core描述文件,关联zk上的配置文件
此时core状态为down
完成core的各种handler
的初始化
2.在zk上注册core:
向zk注册改core
完成数据的恢复(leader数据迁移到副本)
此时core的状态为recovery
,
⚠️此过程在后台执行,不影响solr的启动
初始化core
/**
* Creates a new core based on a CoreDescriptor.
*
* @param dcore a core descriptor
* @param publishState publish core state to the cluster if true
*
* @return the newly created core
*/
private SolrCore create(CoreDescriptor dcore, boolean publishState, boolean newCollection) {
if (isShutDown) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has been shutdown.");
}
SolrCore core = null;
try {
MDCLoggingContext.setCore(core);
SolrIdentifierValidator.validateCoreName(dcore.getName());
if (zkSys.getZkController() != null) {
zkSys.getZkController().preRegister(dcore); //检查state.json,core是否存在,core状态标记为down, 增加对应collection的state.json的监控
}
ConfigSet coreConfig = coreConfigService.getConfig(dcore); //获取core对应的config
log.info("Creating SolrCore '{}' using configuration from {}", dcore.getName(), coreConfig.getName());
core = new SolrCore(dcore, coreConfig); //初始化solr core,包括何种handler
// always kick off recovery if we are in non-Cloud mode
if (!isZooKeeperAware() && core.getUpdateHandler().getUpdateLog() != null) {
core.getUpdateHandler().getUpdateLog().recoverFromLog();
}
registerCore(dcore.getName(), core, publishState, newCollection);
return core;
} catch (Exception e) {
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
if(core != null && !core.isClosed())
IOUtils.closeQuietly(core);
throw solrException;
} catch (Throwable t) {
SolrException e = new SolrException(ErrorCode.SERVER_ERROR, "JVM Error creating core [" + dcore.getName() + "]: " + t.getMessage(), t);
log.error("Error creating core [{}]: {}", dcore.getName(), t.getMessage(), t);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
if(core != null && !core.isClosed())
IOUtils.closeQuietly(core);
throw t;
} finally {
MDCLoggingContext.clear();
}
}
如代码注释所示,详细过程不再赘述;
在zk上注册core
public void registerInZk(final SolrCore core, boolean background, boolean skipRecovery) {
Runnable r = () -> {
MDCLoggingContext.setCore(core);
try {
try {
zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
} catch (Exception e) {
try {
zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
log.error("", e1);
} catch (Exception e1) {
log.error("", e1);
}
SolrException.log(log, "", e);
}
} finally {
MDCLoggingContext.clear();
}
};
if (zkController != null) {
if (background) { //此处background为true,所以在后台执行,初始化程序,不等待register过程结束
coreZkRegister.execute(r);
} else {
MDCLoggingContext.setCore(core);
try {
r.run();
} finally {
MDCLoggingContext.clear();
}
}
}
}
核心过程在zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);
/**
* Register shard with ZooKeeper.
*
* @return the shardId for the SolrCore
*/
public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores,
boolean afterExpiration, boolean skipRecovery) throws Exception {
try (SolrCore core = cc.getCore(desc.getName())) {
MDCLoggingContext.setCore(core);
}
try {
// pre register has published our down state
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
String shardId = cloudDesc.getShardId();
Map<String,Object> props = new HashMap<>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
props.put(ZkStateReader.CORE_NAME_PROP, coreName);
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
ZkNodeProps leaderProps = new ZkNodeProps(props);
try {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false;
Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(),
coreZkNodeName);
if (replica != null) {
joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
}
joinElection(desc, afterExpiration, joinAtHead);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (KeeperException | IOException e) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
// in this case, we want to wait for the leader as long as the leader might
// wait for a vote, at least - but also long enough that a large cluster has
// time to get its act together
String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000); // 选举leader /infra-solr/collections/aaa/leader_elect/shard1/election
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
boolean isLeader = leaderUrl.equals(ourUrl);
try (SolrCore core = cc.getCore(desc.getName())) {
// recover from local transaction log and wait for it to complete before
// going active
// TODO: should this be moved to another thread? To recoveryStrat?
// TODO: should this actually be done earlier, before (or as part of)
// leader election perhaps?
UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); //从tlog重放数据
// we will call register again after zk expiration and on reload
if (!afterExpiration && !core.isReloaded() && ulog != null) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getSlice(collection, shardId);
if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
if (recoveryFuture != null) {
log.info("Replaying tlog for " + ourUrl + " during startup... NOTE: This can take a while.");
recoveryFuture.get(); // NOTE: this could potentially block for
// minutes or more!
// TODO: public as recovering in the mean time?
// TODO: in the future we could do peersync in parallel with recoverFromLog
} else {
log.debug("No LogReplay needed for core={} baseURL={}", core.getName(), baseUrl);
}
}
}
//从leader 恢复数据到同一个数据版本
boolean didRecovery
= checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
if (!didRecovery) {
publish(desc, Replica.State.ACTIVE); //状态变为active
}
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
return shardId;
} finally {
MDCLoggingContext.clear();
}
}
如上注释所示,并不是简单的在zk中注册这个core,经过了以下过程:
- leader选举
- 从tlog重放数据,恢复现场
- 从leader同步数据(如果它不是leader)