Presto多协调者实现

写在前面:
Presto 是facebook开发的一款开源大数据OLAP引擎,基于内存计算和作业分发,直接使用sql分析,不需要数据预处理,类似于impala和sparksql,而且它本身不保存任何数据,通过连接数据源取数。

在Presto的架构中,主要有几种角色

Coordinator:
Coordinator服务器是用来解析语句,执行计划分析和管理Presto的worker结点。Presto安装必须有一个Coordinator和多个worker。
Coordinator跟踪每个work的活动情况并协调查询语句的执行。 Coordinator为每个查询建立模型,模型包含多个stage,每个stage再转为task分发到不同的worker上执行。Coordinator再做结果的聚合。

Worker:
Worker是负责执行任务和处理数据。

Discovery Server:
通常内嵌于coordinator节点中,也可以单独部署,用于Worker的注册,让Coordinator知道Worker有多少,地址是啥

问题:
前面提到,presto集群是基于内存和作业并行处理来执行分析任务,对于机器不多的单个集群来讲,并发很低,不过这倒是次要的,很多情况下,会出现cpu和内存打满的情形,这就直接影响到了可用性(最主要的原因是穷,机器的硬件都是低配且机器只有三台),而最为明显的瓶颈就首先出现在了Coordinator上,多条查询语句并发执行时,有一些直接被阻塞了,而且内存直接打满,通过jstat命令查看,GC就没停下来过,但是Worker的话,都还好

屏幕截图 2021-03-31 162921.jpg

开始我的设想是,增加一台协调者,使用集群的Discovery Server,然后使用nginx做负载,将请求分流到两台协调者上,以减轻Coordinator压力同时增强并发能力

nginx.conf

   upstream backend {
      server 10.100.218.110:8090;
      server 10.101.126.93:8090;
    }


    server {
        listen       8060;
        server_name  tt;

        location / {
            root   html;
            index  index.html index.htm;
            proxy_pass http://backend;
            proxy_redirect off;
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }

        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

    }

Coordinator-1(最开始的协调者) config:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
discovery-server.enabled=true
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=6GB
query.max-total-memory-per-node=8GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16

Coordinator-2(新加入的协调者) config:

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8090
discovery-server.enabled=false
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=4GB
query.max-total-memory-per-node=5GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16

但是,出问题了,所有查询都失败,原因是presto-jdbc执行查询请求并不只是发一个http请求就完事儿,而是会有多个请求去发送预处理,赋值,查询队列状况,获取执行结果等等。通过Fiddler抓包的结果也验证了这一点

企业微信截图_16171750768073.png

要想解决这个问题就必须将这些请求路由到同一个Coordinator节点上,我预先设想的是在presto-jdbc驱动发送请求的时候拦截下来,然后在Header里面强行塞线程id,然后nginx再根据每个请求的线程id来求模做路由

后面发现,在应用端做路由也是可行的,就是注入两个数据源,获取连接的时候根据线程id求模来决定走哪个连接

PrestoRouteDataSource (多数据源实现)


import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;


public class PrestoRouteDataSource implements DataSource {


    private CopyOnWriteArrayList<DataSource> corAddr = new CopyOnWriteArrayList<DataSource>();


    public PrestoRouteDataSource(String addr) {

            String[] split = addr.split(";");
            for (String s : split) {
                HikariDataSource hikariDataSource = new HikariDataSource();
                hikariDataSource.setJdbcUrl(s);
                hikariDataSource.setUsername("root");
                hikariDataSource.setDriverClassName("com.facebook.presto.jdbc.PrestoDriver");
                hikariDataSource.setPoolName("presto:" + s);
                corAddr.add(hikariDataSource);
            }

    }

    private DataSource getPrestoRouteDataSource(){
        long sessionId =  Thread.currentThread().getId();
        String as = String.valueOf(sessionId);
        int subIndex = (as.length()>=4)?(as.length()-3):(as.length()-1);
        int delSessionId =  Integer.parseInt(as.substring(subIndex));
        int index = delSessionId % corAddr.size() ;
        DataSource dataSource = corAddr.get(index);
        return dataSource ;
    }

    @Override
    public Connection getConnection() throws SQLException {


        return getPrestoRouteDataSource().getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return getPrestoRouteDataSource().getConnection();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        throw new UnsupportedOperationException();
    }
}


数据源配置


import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;


@Configuration
@MapperScan(basePackages = {"lvye.java.datamiddle.dao.presto"}, sqlSessionFactoryRef = "prestoSqlSessionFactory")
public class PerstoDataSourceConfig {

    @Value("${presto.jdbc-url}")
    private String url;

    @Bean(name = "prestoDataSource")
    public DataSource prestoDataSource() {
        return new PrestoRouteDataSource(url);

    }

    @Bean(name = "prestoSqlSessionFactory")
    public SqlSessionFactory mallSqlSessionFactory(@Qualifier("prestoDataSource") DataSource mallDataSource)
            throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setTypeAliasesPackage("lvye.java.datamiddle.model.entity");
        sessionFactory.setDataSource(mallDataSource);
        sessionFactory.setMapperLocations(resolveMapperLocations());
        try {
            //开启驼峰命名转换
            Objects.requireNonNull(sessionFactory.getObject()).getConfiguration().setMapUnderscoreToCamelCase(true);
            sessionFactory.getObject().getConfiguration().setJdbcTypeForNull(null);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        return sessionFactory.getObject();
    }

    private Resource[] resolveMapperLocations() {
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        List<String> mapperLocations = new ArrayList<>();
        mapperLocations.add("classpath*:/mapper/presto/*.xml");
        List<Resource> resources = new ArrayList<>();
        for (String mapperLocation : mapperLocations) {
            try {
                Resource[] mappers = resourceResolver.getResources(mapperLocation);
                resources.addAll(Arrays.asList(mappers));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return resources.toArray(new Resource[0]);
    }


    @Bean(name = "prestoTransactionManager")
    public DataSourceTransactionManager mallTransactionManager(@Qualifier("prestoDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "prestoTemplate")
    public SqlSessionTemplate prestoJdbcTemplate(@Qualifier("prestoSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}


application.properties

presto.jdbc-url=jdbc:presto://10.100.218.110:8090/kudu/tclydm;jdbc:presto://10.101.126.93:8090/kudu/tclydm

测试1(多协调者)


multic.jpg

测试2(单协调者)


single.jpg

性能对比:
(多协调者)总耗时 37.66s 平均耗时:3.13s 最大耗时:6.67s 最小耗时:764ms

(单协调者)总耗时 131.01s 平均耗时:10.91s 最大耗时:15.78s 最小耗时:1.6s

写在后面:presto官方显然也意识到了这个问题,多协调者集群目前加入了路线图,目前版本还没有发布

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容