Presto多协调者实现

写在前面:
Presto 是facebook开发的一款开源大数据OLAP引擎,基于内存计算和作业分发,直接使用sql分析,不需要数据预处理,类似于impala和sparksql,而且它本身不保存任何数据,通过连接数据源取数。

在Presto的架构中,主要有几种角色

Coordinator:
Coordinator服务器是用来解析语句,执行计划分析和管理Presto的worker结点。Presto安装必须有一个Coordinator和多个worker。
Coordinator跟踪每个work的活动情况并协调查询语句的执行。 Coordinator为每个查询建立模型,模型包含多个stage,每个stage再转为task分发到不同的worker上执行。Coordinator再做结果的聚合。

Worker:
Worker是负责执行任务和处理数据。

Discovery Server:
通常内嵌于coordinator节点中,也可以单独部署,用于Worker的注册,让Coordinator知道Worker有多少,地址是啥

问题:
前面提到,presto集群是基于内存和作业并行处理来执行分析任务,对于机器不多的单个集群来讲,并发很低,不过这倒是次要的,很多情况下,会出现cpu和内存打满的情形,这就直接影响到了可用性(最主要的原因是穷,机器的硬件都是低配且机器只有三台),而最为明显的瓶颈就首先出现在了Coordinator上,多条查询语句并发执行时,有一些直接被阻塞了,而且内存直接打满,通过jstat命令查看,GC就没停下来过,但是Worker的话,都还好

屏幕截图 2021-03-31 162921.jpg

开始我的设想是,增加一台协调者,使用集群的Discovery Server,然后使用nginx做负载,将请求分流到两台协调者上,以减轻Coordinator压力同时增强并发能力

nginx.conf

   upstream backend {
      server 10.100.218.110:8090;
      server 10.101.126.93:8090;
    }


    server {
        listen       8060;
        server_name  tt;

        location / {
            root   html;
            index  index.html index.htm;
            proxy_pass http://backend;
            proxy_redirect off;
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }

        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

    }

Coordinator-1(最开始的协调者) config:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
discovery-server.enabled=true
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=6GB
query.max-total-memory-per-node=8GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16

Coordinator-2(新加入的协调者) config:

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8090
discovery-server.enabled=false
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=4GB
query.max-total-memory-per-node=5GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16

但是,出问题了,所有查询都失败,原因是presto-jdbc执行查询请求并不只是发一个http请求就完事儿,而是会有多个请求去发送预处理,赋值,查询队列状况,获取执行结果等等。通过Fiddler抓包的结果也验证了这一点

企业微信截图_16171750768073.png

要想解决这个问题就必须将这些请求路由到同一个Coordinator节点上,我预先设想的是在presto-jdbc驱动发送请求的时候拦截下来,然后在Header里面强行塞线程id,然后nginx再根据每个请求的线程id来求模做路由

后面发现,在应用端做路由也是可行的,就是注入两个数据源,获取连接的时候根据线程id求模来决定走哪个连接

PrestoRouteDataSource (多数据源实现)


import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;


public class PrestoRouteDataSource implements DataSource {


    private CopyOnWriteArrayList<DataSource> corAddr = new CopyOnWriteArrayList<DataSource>();


    public PrestoRouteDataSource(String addr) {

            String[] split = addr.split(";");
            for (String s : split) {
                HikariDataSource hikariDataSource = new HikariDataSource();
                hikariDataSource.setJdbcUrl(s);
                hikariDataSource.setUsername("root");
                hikariDataSource.setDriverClassName("com.facebook.presto.jdbc.PrestoDriver");
                hikariDataSource.setPoolName("presto:" + s);
                corAddr.add(hikariDataSource);
            }

    }

    private DataSource getPrestoRouteDataSource(){
        long sessionId =  Thread.currentThread().getId();
        String as = String.valueOf(sessionId);
        int subIndex = (as.length()>=4)?(as.length()-3):(as.length()-1);
        int delSessionId =  Integer.parseInt(as.substring(subIndex));
        int index = delSessionId % corAddr.size() ;
        DataSource dataSource = corAddr.get(index);
        return dataSource ;
    }

    @Override
    public Connection getConnection() throws SQLException {


        return getPrestoRouteDataSource().getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return getPrestoRouteDataSource().getConnection();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        throw new UnsupportedOperationException();
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        throw new UnsupportedOperationException();
    }
}


数据源配置


import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;


@Configuration
@MapperScan(basePackages = {"lvye.java.datamiddle.dao.presto"}, sqlSessionFactoryRef = "prestoSqlSessionFactory")
public class PerstoDataSourceConfig {

    @Value("${presto.jdbc-url}")
    private String url;

    @Bean(name = "prestoDataSource")
    public DataSource prestoDataSource() {
        return new PrestoRouteDataSource(url);

    }

    @Bean(name = "prestoSqlSessionFactory")
    public SqlSessionFactory mallSqlSessionFactory(@Qualifier("prestoDataSource") DataSource mallDataSource)
            throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setTypeAliasesPackage("lvye.java.datamiddle.model.entity");
        sessionFactory.setDataSource(mallDataSource);
        sessionFactory.setMapperLocations(resolveMapperLocations());
        try {
            //开启驼峰命名转换
            Objects.requireNonNull(sessionFactory.getObject()).getConfiguration().setMapUnderscoreToCamelCase(true);
            sessionFactory.getObject().getConfiguration().setJdbcTypeForNull(null);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        return sessionFactory.getObject();
    }

    private Resource[] resolveMapperLocations() {
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        List<String> mapperLocations = new ArrayList<>();
        mapperLocations.add("classpath*:/mapper/presto/*.xml");
        List<Resource> resources = new ArrayList<>();
        for (String mapperLocation : mapperLocations) {
            try {
                Resource[] mappers = resourceResolver.getResources(mapperLocation);
                resources.addAll(Arrays.asList(mappers));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return resources.toArray(new Resource[0]);
    }


    @Bean(name = "prestoTransactionManager")
    public DataSourceTransactionManager mallTransactionManager(@Qualifier("prestoDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "prestoTemplate")
    public SqlSessionTemplate prestoJdbcTemplate(@Qualifier("prestoSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}


application.properties

presto.jdbc-url=jdbc:presto://10.100.218.110:8090/kudu/tclydm;jdbc:presto://10.101.126.93:8090/kudu/tclydm

测试1(多协调者)


multic.jpg

测试2(单协调者)


single.jpg

性能对比:
(多协调者)总耗时 37.66s 平均耗时:3.13s 最大耗时:6.67s 最小耗时:764ms

(单协调者)总耗时 131.01s 平均耗时:10.91s 最大耗时:15.78s 最小耗时:1.6s

写在后面:presto官方显然也意识到了这个问题,多协调者集群目前加入了路线图,目前版本还没有发布

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容