solr源码解析(一)———启动过程详解

本文主要是针对solr启动过程的理解。⚠️使用solr-6.3.0版本

启动入口

web.xml

solr在中嵌入了jetty作为web容器,所以solr归根结底是一个web服务,所以从web.xml入手查看:

<?xml version="1.0" encoding="UTF-8"?>

  <!-- Any path (name) registered in solrconfig.xml will be sent to that filter -->
  <filter>
    <filter-name>SolrRequestFilter</filter-name>
    <filter-class>org.apache.solr.servlet.SolrDispatchFilter</filter-class>
    <init-param>
      <param-name>excludePatterns</param-name>
      <param-value>/css/.+,/js/.+,/img/.+,/tpl/.+</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>SolrRequestFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

web.xml可以看出solr的所有请求都会经过SolrRequestFilter这个filter,所以猜测solr的入口在这里,打开一看,果不其然,入口就是org.apache.solr.servlet.SolrDispatchFilter这个类

SolrDispatchFilter

  protected volatile CoreContainer cores;
  @Override
  public void init(FilterConfig config) throws ServletException
  {
  ...
      String solrHome = (String) config.getServletContext().getAttribute(SOLRHOME_ATTRIBUTE);

      this.cores = createCoreContainer(solrHome == null ? SolrResourceLoader.locateSolrHome() : Paths.get(solrHome),
                                       extraProperties);
....
 }
  protected CoreContainer createCoreContainer(Path solrHome, Properties extraProperties) {
    NodeConfig nodeConfig = loadNodeConfig(solrHome, extraProperties);
    cores = new CoreContainer(nodeConfig, extraProperties, true);
    cores.load();
    return cores;
  }

createCoreContainer方法可以看出,init的过程基本分为两个部分:

  • 加载nodeconfig
  • 加载cores

⚠️实际上,除了old UI,其他所有的请求都通过这个filter进行处理并返回结果。这个可以在做查询请求代码解析的时候做出说明。

image.png

整体过程如图所示,

加载nodeconfig

  public static NodeConfig loadNodeConfig(Path solrHome, Properties nodeProperties) {

    SolrResourceLoader loader = new SolrResourceLoader(solrHome, null, nodeProperties);
    String zkHost = System.getProperty("zkHost");
    if (!StringUtils.isEmpty(zkHost)) {
      try (SolrZkClient zkClient = new SolrZkClient(zkHost, 30000)) {
        if (zkClient.exists("/solr.xml", true)) {
          log.info("solr.xml found in ZooKeeper. Loading...");
          byte[] data = zkClient.getData("/solr.xml", null, null, true);
          return SolrXmlConfig.fromInputStream(loader, new ByteArrayInputStream(data));
        }
      } catch (Exception e) {
        throw new SolrException(ErrorCode.SERVER_ERROR, "Error occurred while loading solr.xml from zookeeper", e);
      }
      log.info("Loading solr.xml from SolrHome (not found in ZooKeeper)");
    }
    return SolrXmlConfig.fromSolrHome(loader, loader.getInstancePath());
  }

public final static String SOLR_XML_FILE = "solr.xml";
public static NodeConfig fromSolrHome(SolrResourceLoader loader, Path solrHome) {
    return fromFile(loader, solrHome.resolve(SOLR_XML_FILE));
  }

由此可见,loadNodeConfig只是对于solr.xml的加载,必须保证指定的solr.home下存在solr.xml文件

加载cores

//CoreContainer
/**
   * Load the cores defined for this CoreContainer
   */
  public void load()  {
    log.debug("Loading cores into CoreContainer [instanceDir={}]", loader.getInstancePath());

    // add the sharedLib to the shared resource loader before initializing cfg based plugins
    String libDir = cfg.getSharedLibDirectory();
    if (libDir != null) {
      Path libPath = loader.getInstancePath().resolve(libDir);
      try {
        loader.addToClassLoader(SolrResourceLoader.getURLs(libPath));
        loader.reloadLuceneSPI();
      } catch (IOException e) {
        if (!libDir.equals("lib")) { // Don't complain if default "lib" dir does not exist
          log.warn("Couldn't add files from {} to classpath: {}", libPath, e.getMessage());
        }
      }
    }


    shardHandlerFactory = ShardHandlerFactory.newInstance(cfg.getShardHandlerFactoryPluginInfo(), loader); //初始化shardHandlerFactory

    updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig());//初始化updateShardHandler

    solrCores.allocateLazyCores(cfg.getTransientCacheSize(), loader);

    logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader);

    hostName = cfg.getNodeName();

    zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig());
    if(isZooKeeperAware())  pkiAuthenticationPlugin = new PKIAuthenticationPlugin(this, zkSys.getZkController().getNodeName());

    MDCLoggingContext.setNode(this);

    //安全认证插件初始化
    ZkStateReader.ConfigData securityConfig = isZooKeeperAware() ? getZkController().getZkStateReader().getSecurityProps(false) : new ZkStateReader.ConfigData(EMPTY_MAP, -1);
    initializeAuthorizationPlugin((Map<String, Object>) securityConfig.data.get("authorization"));
    initializeAuthenticationPlugin((Map<String, Object>) securityConfig.data.get("authentication"));

    //backupRepoFactory初始化
    this.backupRepoFactory = new BackupRepositoryFactory(cfg.getBackupRepositoryPlugins());

    //admin系列的handler初始化
    containerHandlers.put(ZK_PATH, new ZookeeperInfoHandler(this));
    securityConfHandler = new SecurityConfHandler(this);
    collectionsHandler = createHandler(cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
    containerHandlers.put(COLLECTIONS_HANDLER_PATH, collectionsHandler);
    infoHandler        = createHandler(cfg.getInfoHandlerClass(), InfoHandler.class);
    containerHandlers.put(INFO_HANDLER_PATH, infoHandler);
    coreAdminHandler   = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
    containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler);
    configSetsHandler = createHandler(cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
    containerHandlers.put(CONFIGSETS_HANDLER_PATH, configSetsHandler);
    containerHandlers.put(AUTHZ_PATH, securityConfHandler);
    containerHandlers.put(AUTHC_PATH, securityConfHandler);
    if(pkiAuthenticationPlugin != null)
      containerHandlers.put(PKIAuthenticationPlugin.PATH, pkiAuthenticationPlugin.getRequestHandler());

    coreConfigService = ConfigSetService.createConfigSetService(cfg, loader, zkSys.zkController);//coreConfigService 管理zk上的config文件

    containerProperties.putAll(cfg.getSolrProperties());

    //开始加载cores
    // setup executor to load cores in parallel
    ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(
        cfg.getCoreLoadThreadCount(isZooKeeperAware()),
        new DefaultSolrThreadFactory("coreLoadExecutor") );
    final List<Future<SolrCore>> futures = new ArrayList<>();
    try {
      //从solr home发现cores
      List<CoreDescriptor> cds = coresLocator.discover(this); //CorePropertiesLocator 负责遍历solr_home,发现core.properties
      if (isZooKeeperAware()) {
        //sort the cores if it is in SolrCloud. In standalone node the order does not matter
        CoreSorter coreComparator = new CoreSorter().init(this);
        cds = new ArrayList<>(cds);//make a copy
        Collections.sort(cds, coreComparator::compare);
      }
      checkForDuplicateCoreNames(cds);

      //加载cores
      for (final CoreDescriptor cd : cds) {
        if (cd.isTransient() || !cd.isLoadOnStartup()) {
          solrCores.putDynamicDescriptor(cd.getName(), cd);
        } else if (asyncSolrCoreLoad) {
          solrCores.markCoreAsLoading(cd);
        }
        if (cd.isLoadOnStartup()) {
          futures.add(coreLoadExecutor.submit(() -> {
            SolrCore core;
            try {
              if (zkSys.getZkController() != null) {
                zkSys.getZkController().throwErrorIfReplicaReplaced(cd);
              }

              core = create(cd, false, false); //根据core描述文件,关联zk上的配置文件,状态为down
            } finally {
              if (asyncSolrCoreLoad) {
                solrCores.markCoreAsNotLoading(cd);
              }
            }
            try {
              zkSys.registerInZk(core, true, false);//真正加载core,完成数据的恢复(leader数据迁移到副本),后台执行
            } catch (RuntimeException e) {
              SolrException.log(log, "Error registering SolrCore", e);
            }
            return core;
          }));
        }
      }
      //结束加载cores

      // Start the background thread
      backgroundCloser = new CloserThread(this, solrCores, cfg);
      backgroundCloser.start();

    } finally {
      if (asyncSolrCoreLoad && futures != null) {

        coreContainerWorkExecutor.submit((Runnable) () -> {
          try {
            for (Future<SolrCore> future : futures) {
              try {
                future.get();
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              } catch (ExecutionException e) {
                log.error("Error waiting for SolrCore to be created", e);
              }
            }
          } finally {
            ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);//等待初始化完毕
          }
        });
      } else {
        ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
      }
    }

    if (isZooKeeperAware()) {
      zkSys.getZkController().checkOverseerDesignate();
    }
  }

如上代码中汉语注释所示,load过程主要完成以下工作:

  • 初始化管理shardshardHandlerFactoryupdateShardHandler
  • 初始化管理admincontainerHandlers
  • 遍历solr_home获取core.properties,获取所有core的元数据
  • 并发加载cores,
    1.初始化core:
    根据core描述文件,关联zk上的配置文件
    此时core状态为down
    完成core的各种handler的初始化
    2.在zk上注册core:
    向zk注册改core
    完成数据的恢复(leader数据迁移到副本)
    此时core的状态为recovery
    ⚠️此过程在后台执行,不影响solr的启动

初始化core


 /**
   * Creates a new core based on a CoreDescriptor.
   *
   * @param dcore        a core descriptor
   * @param publishState publish core state to the cluster if true
   *
   * @return the newly created core
   */
  private SolrCore create(CoreDescriptor dcore, boolean publishState, boolean newCollection) {

    if (isShutDown) {
      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has been shutdown.");
    }

    SolrCore core = null;
    try {
      MDCLoggingContext.setCore(core);
      SolrIdentifierValidator.validateCoreName(dcore.getName());
      if (zkSys.getZkController() != null) {
        zkSys.getZkController().preRegister(dcore);  //检查state.json,core是否存在,core状态标记为down, 增加对应collection的state.json的监控
      }

      ConfigSet coreConfig = coreConfigService.getConfig(dcore);  //获取core对应的config
      log.info("Creating SolrCore '{}' using configuration from {}", dcore.getName(), coreConfig.getName());
      core = new SolrCore(dcore, coreConfig);   //初始化solr core,包括何种handler

      // always kick off recovery if we are in non-Cloud mode
      if (!isZooKeeperAware() && core.getUpdateHandler().getUpdateLog() != null) {
        core.getUpdateHandler().getUpdateLog().recoverFromLog();
      }

      registerCore(dcore.getName(), core, publishState, newCollection);

      return core;
    } catch (Exception e) {
      coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
      log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
      final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
      if(core != null && !core.isClosed())
        IOUtils.closeQuietly(core);
      throw solrException;
    } catch (Throwable t) {
      SolrException e = new SolrException(ErrorCode.SERVER_ERROR, "JVM Error creating core [" + dcore.getName() + "]: " + t.getMessage(), t);
      log.error("Error creating core [{}]: {}", dcore.getName(), t.getMessage(), t);
      coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
      if(core != null && !core.isClosed())
        IOUtils.closeQuietly(core);
      throw t;
    } finally {
      MDCLoggingContext.clear();
    }
  }

如代码注释所示,详细过程不再赘述;

在zk上注册core

public void registerInZk(final SolrCore core, boolean background, boolean skipRecovery) {
    Runnable r = () -> {
      MDCLoggingContext.setCore(core);
      try {
        try {
          zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);
        } catch (InterruptedException e) {
          // Restore the interrupted status
          Thread.currentThread().interrupt();
          SolrException.log(log, "", e);
        } catch (Exception e) {
          try {
            zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
          } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            log.error("", e1);
          } catch (Exception e1) {
            log.error("", e1);
          }
          SolrException.log(log, "", e);
        }
      } finally {
        MDCLoggingContext.clear();
      }
    };

    if (zkController != null) {
      if (background) {   //此处background为true,所以在后台执行,初始化程序,不等待register过程结束
        coreZkRegister.execute(r);
      } else {
        MDCLoggingContext.setCore(core);
        try {
          r.run();
        } finally {
          MDCLoggingContext.clear();
        }
      }
    }
  }

核心过程在zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);

/**
   * Register shard with ZooKeeper.
   *
   * @return the shardId for the SolrCore
   */
  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores,
                         boolean afterExpiration, boolean skipRecovery) throws Exception {
    try (SolrCore core = cc.getCore(desc.getName())) {
      MDCLoggingContext.setCore(core);
    }
    try {
      // pre register has published our down state
      final String baseUrl = getBaseUrl();
      
      final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
      final String collection = cloudDesc.getCollectionName();
      
      final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
      assert coreZkNodeName != null : "we should have a coreNodeName by now";
      
      String shardId = cloudDesc.getShardId();
      Map<String,Object> props = new HashMap<>();
      // we only put a subset of props into the leader node
      props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
      props.put(ZkStateReader.CORE_NAME_PROP, coreName);
      props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
      
      log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
          coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
      
      ZkNodeProps leaderProps = new ZkNodeProps(props);
      
      try {
        // If we're a preferred leader, insert ourselves at the head of the queue
        boolean joinAtHead = false;
        Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(),
            coreZkNodeName);
        if (replica != null) {
          joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
        }
        joinElection(desc, afterExpiration, joinAtHead);
      } catch (InterruptedException e) {
        // Restore the interrupted status
        Thread.currentThread().interrupt();
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
      } catch (KeeperException | IOException e) {
        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
      }
      
      // in this case, we want to wait for the leader as long as the leader might
      // wait for a vote, at least - but also long enough that a large cluster has
      // time to get its act together
      String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);  // 选举leader /infra-solr/collections/aaa/leader_elect/shard1/election
      
      String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
      log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
      boolean isLeader = leaderUrl.equals(ourUrl);
      
      try (SolrCore core = cc.getCore(desc.getName())) {
        
        // recover from local transaction log and wait for it to complete before
        // going active
        // TODO: should this be moved to another thread? To recoveryStrat?
        // TODO: should this actually be done earlier, before (or as part of)
        // leader election perhaps?
        
        UpdateLog ulog = core.getUpdateHandler().getUpdateLog();   //从tlog重放数据
        
        // we will call register again after zk expiration and on reload
        if (!afterExpiration && !core.isReloaded() && ulog != null) {
          // disable recovery in case shard is in construction state (for shard splits)
          Slice slice = getClusterState().getSlice(collection, shardId);
          if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
            Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
            if (recoveryFuture != null) {
              log.info("Replaying tlog for " + ourUrl + " during startup... NOTE: This can take a while.");
              recoveryFuture.get(); // NOTE: this could potentially block for
              // minutes or more!
              // TODO: public as recovering in the mean time?
              // TODO: in the future we could do peersync in parallel with recoverFromLog
            } else {
              log.debug("No LogReplay needed for core={} baseURL={}", core.getName(), baseUrl);
            }
          }
        }
        //从leader 恢复数据到同一个数据版本
        boolean didRecovery
            = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
        if (!didRecovery) {
          publish(desc, Replica.State.ACTIVE);  //状态变为active
        }
        
        core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
      }
      
      // make sure we have an update cluster state right away
      zkStateReader.forceUpdateCollection(collection);
      return shardId;
    } finally {
      MDCLoggingContext.clear();
    }
  }

如上注释所示,并不是简单的在zk中注册这个core,经过了以下过程:

  • leader选举
  • 从tlog重放数据,恢复现场
  • 从leader同步数据(如果它不是leader)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容