Presto调度模块源码阅读(2)-服务端响应-资源组选择

当用户提交一个SQL作业时,Presto客户端会封装一个Request通过Restful接口将请求发送到服务端,下面就详细讲解一下服务端的处理过程。

Client端发送请求的地址是/v1/statement,对应到StatementResource的createQuery方法。在该方法中会调用Query的static方法create,在create方法中new了一个Query对象,然后会调用SqlQueryManager的createQuery方法。

在createQuery方法中首先会创建QueryId,生成规则是:

return new QueryId(String.format("%s_%05d_%s", lastTimestamp, counter++, coordinatorId));

然后presto会判断集群是否有可用节点,其中isIncludeCoordinator变量对应config.properties配置文件中的node-scheduler.include-coordinator配置项,表示是否允许调度task到coordinator节点进行计算。

如果集群可用节点小于最小值1(参数query-manager.initialization-required-workers),则给出熟悉的报错信息“Cluster is still initializing……”。

            if (!acceptQueries.get()) {
                int activeWorkerCount = internalNodeManager.getNodes(ACTIVE).size();
                if (!isIncludeCoordinator) {
                    activeWorkerCount--;
                }
                if (nanosSince(initialNanos).compareTo(initializationTimeout) < 0 && activeWorkerCount < initializationRequiredWorkers) {
                    throw new PrestoException(
                            SERVER_STARTING_UP,
                            String.format("Cluster is still initializing, there are insufficient active worker nodes (%s) to run query", activeWorkerCount));
                }
                acceptQueries.set(true);
            }

除此之外presto还对sql长度做了限制,要求不能超过query.max-length(默认1_000_000_000,表示10亿)。

然后presto会根据提交作业的客户端信息选择资源组。

            Optional<String> queryType = getQueryType(query);
            selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    Optional.ofNullable(sessionContext.getSource()),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));

上图代码中selectGroup方法对应到InternalResourceGroupManager的selectGroup方法,其中configurationManager的类型是AtomicReference<ResourceGroupConfigurationManager<C>>。selectGroup方法实现如下:

    @Override
    public SelectionContext<C> selectGroup(SelectionCriteria criteria)
    {
        return configurationManager.get().match(criteria)
                .orElseThrow(() -> new PrestoException(QUERY_REJECTED, "Query did not match any selection rule"));
    }

然后我们点进match方法,来到了ResourceGroupConfigurationManager接口中,我们看到这个方法的实现类有如下三个:


ResourceGroupConfigurationManager接口实现类

那么问题来了,当我们调用match方法时,执行的是这三个实现类中的哪一个呢?

我们首先看一下configurationManager初始化时的值,如下图所示初始化时其类型为LegacyResourceGroupConfigurationManager:

    @Inject
    public InternalResourceGroupManager(LegacyResourceGroupConfigurationManager legacyManager, ClusterMemoryPoolManager memoryPoolManager, NodeInfo nodeInfo, MBeanExporter exporter)
    {
        this.exporter = requireNonNull(exporter, "exporter is null");
        this.configurationManagerContext = new ResourceGroupConfigurationManagerContextInstance(memoryPoolManager, nodeInfo.getEnvironment());
        this.legacyManager = requireNonNull(legacyManager, "legacyManager is null");
        this.configurationManager = new AtomicReference<>(cast(legacyManager));
    }

然后我们搜一下configurationManager的引用,发现在InternalResourceGroupManager类的setConfigurationManager方法中修改了他的值。如下图:

    @VisibleForTesting
    public void setConfigurationManager(String name, Map<String, String> properties)
    {
        requireNonNull(name, "name is null");
        requireNonNull(properties, "properties is null");

        log.info("-- Loading resource group configuration manager --");

        ResourceGroupConfigurationManagerFactory configurationManagerFactory = configurationManagerFactories.get(name);
        checkState(configurationManagerFactory != null, "Resource group configuration manager %s is not registered", name);

        ResourceGroupConfigurationManager<C> configurationManager = cast(configurationManagerFactory.create(ImmutableMap.copyOf(properties), configurationManagerContext));
        checkState(this.configurationManager.compareAndSet(cast(legacyManager), configurationManager), "configurationManager already set");

        log.info("-- Loaded resource group configuration manager %s --", name);
    }

该方法在同一个类的loadConfigurationManager方法中被调用。loadConfigurationManager方法会判断常量RESOURCE_GROUPS_CONFIGURATION对应的etc/resource-groups.properties文件是否存在,如果存在会读取文件中配置的resource-groups.configuration-manager参数为Key值,到configurationManagerFactories中取出对应的ResourceGroupConfigurationManagerFactory对象,然后调用其create方法构造一个ResourceGroupConfigurationManager对象,最终赋值给configurationManager。方法的实现如下:

    @Override
    public void loadConfigurationManager()
            throws Exception
    {
        if (RESOURCE_GROUPS_CONFIGURATION.exists()) {
            Map<String, String> properties = new HashMap<>(loadProperties(RESOURCE_GROUPS_CONFIGURATION));

            String configurationManagerName = properties.remove(CONFIGURATION_MANAGER_PROPERTY_NAME);
            checkArgument(!isNullOrEmpty(configurationManagerName),
                    "Resource groups configuration %s does not contain %s", RESOURCE_GROUPS_CONFIGURATION.getAbsoluteFile(), CONFIGURATION_MANAGER_PROPERTY_NAME);

            setConfigurationManager(configurationManagerName, properties);
        }
    }

而loadConfigurationManager方法又在PrestoServer类的初始化方法中被调用。

injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();

(PS:ResourceGroupManager的实现类型是在CoordinatorModule这个类中被注入的:

binder.bind(ResourceGroupManager.class).to(InternalResourceGroupManager.class);

也就是说,当PrestoServer通过其main方法调用run方法进行初始化时, 会读取etc/resource-groups.properties文件中的配置项resource-groups.configuration-manager,再以它为Key值读取configurationManagerFactories中对应的ResourceGroupConfigurationManagerFactory,然后调用读取出来的工厂类的create方法构建ResourceGroupConfigurationManager对象,最后赋值给InternalResourceGroupManager类的configurationManager。

另一个问题出现了,configurationManagerFactories这个Map类型的全局变量是在什么时候赋值的,里边都有哪些值呢?

我们还是搜索一下它的引用,发现在InternalResourceGroupManager的addConfigurationManagerFactory方法中对其进行了putIfAbsent操作(不存在则put)。

    @Override
    public void addConfigurationManagerFactory(ResourceGroupConfigurationManagerFactory factory)
    {
        if (configurationManagerFactories.putIfAbsent(factory.getName(), factory) != null) {
            throw new IllegalArgumentException(format("Resource group configuration manager '%s' is already registered", factory.getName()));
        }
    }

搜索引用发现,在PluginManager的installPlugin方法中调用了这个方法:

        for (ResourceGroupConfigurationManagerFactory configurationManagerFactory : plugin.getResourceGroupConfigurationManagerFactories()) {
            log.info("Registering resource group configuration manager %s", configurationManagerFactory.getName());
            resourceGroupManager.addConfigurationManagerFactory(configurationManagerFactory);
        }

然后我们看一下plugin.getResourceGroupConfigurationManagerFactories方法的定义,发现他有两个实现类,


Plugin接口的两个实现类

ResourceGroupManagerPlugin的实现如下:

public class ResourceGroupManagerPlugin
        implements Plugin
{
    @Override
    public Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfigurationManagerFactories()
    {
        return ImmutableList.of(
                new FileResourceGroupConfigurationManagerFactory(getClassLoader()),
                new DbResourceGroupConfigurationManagerFactory(getClassLoader()));
    }

H2ResourceGroupManagerPlugin的实现如下:

public class H2ResourceGroupManagerPlugin
        implements Plugin
{
    @Override
    public Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfigurationManagerFactories()
    {
        return ImmutableList.of(
                new H2ResourceGroupConfigurationManagerFactory(getClassLoader()));
    }

我们在addConfigurationManagerFactory方法中可以看到,添加到configurationManagerFactories这个Map中时,是以factory的name作为Key值,factory为Value的:

    @Override
    public void addConfigurationManagerFactory(ResourceGroupConfigurationManagerFactory factory)
    {
        if (configurationManagerFactories.putIfAbsent(factory.getName(), factory) != null) {
            throw new IllegalArgumentException(format("Resource group configuration manager '%s' is already registered", factory.getName()));
        }
    }

所以我们看一下这三个实现类对应的name值,也就是resource-groups.configuration-manager参数的可选值:
db:DbResourceGroupConfigurationManagerFactory
h2:H2ResourceGroupConfigurationManagerFactory
file:FileResourceGroupConfigurationManagerFactory

然后,我们回过头来看一下PluginManager的installPlugin方法,该方法在同类的loadPlugin方法中被调用,

    private void loadPlugin(URLClassLoader pluginClassLoader)
    {
        ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
        List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);

        if (plugins.isEmpty()) {
            log.warn("No service providers of type %s", Plugin.class.getName());
        }

        for (Plugin plugin : plugins) {
            log.info("Installing %s", plugin.getClass().getName());
            installPlugin(plugin);
        }
    }

loadPlugin方法又在该类中再次被调用:

    private void loadPlugin(String plugin)
            throws Exception
    {
        log.info("-- Loading plugin %s --", plugin);
        URLClassLoader pluginClassLoader = buildClassLoader(plugin);
        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
            loadPlugin(pluginClassLoader);
        }
        log.info("-- Finished loading plugin %s --", plugin);
    }

再往上是loadPlugins方法:

    public void loadPlugins()
            throws Exception
    {
        if (!pluginsLoading.compareAndSet(false, true)) {
            return;
        }

        for (File file : listFiles(installedPluginsDir)) {
            if (file.isDirectory()) {
                loadPlugin(file.getAbsolutePath());
            }
        }

        for (String plugin : plugins) {
            loadPlugin(plugin);
        }

        metadata.verifyComparableOrderableContract();

        pluginsLoaded.set(true);
    }

再次向上查找,原来loadPlugins方法是在PrestoServer的run方法中,先与loadConfigurationManager方法被调用的:

injector.getInstance(PluginManager.class).loadPlugins();

也就是说,Presto默认是按照LegacyResourceGroupConfigurationManager进行资源组管理的。

在PrestoServer调用run方法进行初始化时,首先会执行PluginManager的loadPlugins方法,向InternalResourceGroupManager中一个存放ResourceGroupManagerFactory类型元素的Map添加可用的资源组管理工厂类。

然后会调用InternalResourceGroupManager的loadConfigurationManager方法,判断是否配置了参数resource-groups.configuration-manager,如果配置了则会按照配置的manager类型从这个Map中根据ResourceGroupFactory的name取出相应的factory。

最后会根据取出的factory对象create一个ResourceGroupConfigurationManager,并将其赋值给configurationManager。

在Presto的官方文档中我们看到,presto只描述了一种name为file的ResourceGroupManagerFactory,对应FileResourceGroupConfigurationManagerFactory。看来这是官方比较推荐的类型。

===============
Resource Groups
===============

Resource groups place limits on resource usage, and can enforce queueing policies on
queries that run within them or divide their resources among sub groups. A query
belongs to a single resource group, and consumes resources from that group (and its ancestors).
Except for the limit on queued queries, when a resource group runs out of a resource
it does not cause running queries to fail; instead new queries become queued.
A resource group may have sub groups or may accept queries, but may not do both.

The resource groups and associated selection rules are configured by a manager which is pluggable.
Add an ``etc/resource-groups.properties`` file with the following contents to enable
the built-in manager that reads a JSON config file:

.. code-block:: none

    resource-groups.configuration-manager=file
    resource-groups.config-file=etc/resource_groups.json

Change the value of ``resource-groups.config-file`` to point to a JSON config file,
which can be an absolute path, or a path relative to the Presto data directory.

Resource Group Properties
-------------------------

接下来我们看一下FileResourceGroupConfigurationManager类的match方法,如下图:

    @Override
    public Optional<SelectionContext<VariableMap>> match(SelectionCriteria criteria)
    {
        return selectors.stream()
                .map(s -> s.match(criteria))
                .filter(Optional::isPresent)
                .map(Optional::get)
                .findFirst();
    }

入参SelectionCriteria是从session中取得的用户信息,如下图:

            selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    Optional.ofNullable(sessionContext.getSource()),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));

从match方法可以看到他会从selectors中找到跟session中用户信息相匹配的ResourceGroupSelector,如果得到的Optional对象不存在value,则给出熟悉的异常信息Query did not match any selection rule,如果存在value则作业继续向下执行。

selectors对象是从resource-groups.config-file配置项指定的文件中解析得到的ResourceGroup配置信息。其初始化的代码是在FileResourceGroupConfigurationManager的构造函数中,如下:

    @Inject
    public FileResourceGroupConfigurationManager(ClusterMemoryPoolManager memoryPoolManager, FileResourceGroupConfig config)
    {
        super(memoryPoolManager);
        requireNonNull(config, "config is null");

        ManagerSpec managerSpec;
        try {
            managerSpec = CODEC.fromJson(Files.readAllBytes(Paths.get(config.getConfigFile())));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (IllegalArgumentException e) {
            Throwable cause = e.getCause();
            if (cause instanceof UnrecognizedPropertyException) {
                UnrecognizedPropertyException ex = (UnrecognizedPropertyException) cause;
                String message = format("Unknown property at line %s:%s: %s",
                        ex.getLocation().getLineNr(),
                        ex.getLocation().getColumnNr(),
                        ex.getPropertyName());
                throw new IllegalArgumentException(message, e);
            }
            if (cause instanceof JsonMappingException) {
                // remove the extra "through reference chain" message
                if (cause.getCause() != null) {
                    cause = cause.getCause();
                }
                throw new IllegalArgumentException(cause.getMessage(), e);
            }
            throw e;
        }

        this.rootGroups = managerSpec.getRootGroups();
        this.cpuQuotaPeriod = managerSpec.getCpuQuotaPeriod();
        validateRootGroups(managerSpec);
        this.selectors = buildSelectors(managerSpec);
    }

其中,config.getConfigFile方法对应配置项resource-groups.config-file:

    @NotNull
    public String getConfigFile()
    {
        return configFile;
    }

    @Config("resource-groups.config-file")
    public FileResourceGroupConfig setConfigFile(String configFile)
    {
        this.configFile = configFile;
        return this;
    }

在buildSelectors方法中可以看到selectors中添加的对象类型是StaticSelector,这样在match方法的lambda表达式s -> s.match中,s对象就是StaticSelector类型的了。

    protected List<ResourceGroupSelector> buildSelectors(ManagerSpec managerSpec)
    {
        ImmutableList.Builder<ResourceGroupSelector> selectors = ImmutableList.builder();
        for (SelectorSpec spec : managerSpec.getSelectors()) {
            validateSelectors(managerSpec.getRootGroups(), spec);
            selectors.add(new StaticSelector(
                    spec.getUserRegex(),
                    spec.getSourceRegex(),
                    spec.getClientTags(),
                    spec.getResourceEstimate(),
                    spec.getQueryType(),
                    spec.getGroup()));
        }
        return selectors.build();
    }

在StaticSelector的match方法中我们看到,它会根据json文件中读取到的信息与客户端信息依次做校验,如校验不通过则返回一个没有值的Optional对象,以便selectGroup方法抛出异常。如果全部校验通过,最终会封装一个SelectionContext类型的Optional对象返回。

    @Override
    public Optional<SelectionContext<VariableMap>> match(SelectionCriteria criteria)
    {
        Map<String, String> variables = new HashMap<>();

        if (userRegex.isPresent()) {
            Matcher userMatcher = userRegex.get().matcher(criteria.getUser());
            if (!userMatcher.matches()) {
                return Optional.empty();
            }

            addVariableValues(userRegex.get(), criteria.getUser(), variables);
        }

        if (sourceRegex.isPresent()) {
            String source = criteria.getSource().orElse("");
            if (!sourceRegex.get().matcher(source).matches()) {
                return Optional.empty();
            }

            addVariableValues(sourceRegex.get(), source, variables);
        }

        if (!clientTags.isEmpty() && !criteria.getTags().containsAll(clientTags)) {
            return Optional.empty();
        }

        if (selectorResourceEstimate.isPresent() && !selectorResourceEstimate.get().match(criteria.getResourceEstimates())) {
            return Optional.empty();
        }

        if (queryType.isPresent()) {
            String contextQueryType = criteria.getQueryType().orElse("");
            if (!queryType.get().equalsIgnoreCase(contextQueryType)) {
                return Optional.empty();
            }
        }

        variables.putIfAbsent(USER_VARIABLE, criteria.getUser());

        // Special handling for source, which is an optional field that is part of the standard variables
        variables.putIfAbsent(SOURCE_VARIABLE, criteria.getSource().orElse(""));

        VariableMap map = new VariableMap(variables);
        ResourceGroupId id = group.expandTemplate(map);

        return Optional.of(new SelectionContext<>(id, map));
    }

以上就是Presto资源组校验的代码,后续将继续整理服务端响应作业提交请求的代码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352