7. sharding-jdbc源码之group by结果合并(2)

阿飞Javaer,转载请注明原创出处,谢谢!

sharding-jdbc源码之group by结果合并(1)中主要分析了sharding-jdbc如何在GroupByStreamResultSetMergerGroupByMemoryResultSetMerger中选择,并分析了GroupByStreamResultSetMerger的实现;接下来分析GroupByMemoryResultSetMerger的实现原理;

通过sharding-jdbc源码之group by结果合并(1)的分析可知,如果要走GroupByMemoryResultSetMerger,那么需要这样的SQL:SELECT o.status, count(o.user_id) count_user_id FROM t_order o where o.user_id=10 group by o.status order by count_user_id asc,即group by和order by的字段不一样;接下来的分析都是基于这条SQL;

ExecutorEngine.build()方法中通过return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);调用GroupByMemoryResultSetMerger,GroupByMemoryResultSetMerger的构造方法源码如下:

public GroupByMemoryResultSetMerger(
        final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
    // labelAndIndexMap就是select结果列与位置索引的map,例如{count_user_id:2, status:1}
    super(labelAndIndexMap);
    // select查询语句
    this.selectStatement = selectStatement;
    // resultSets就是并发在多个实际表执行返回的结果集合,在多少个实际表上执行,resultSets的size就有多大;
    memoryResultSetRows = init(resultSets);
}

在实际表t_order_0和t_order_1上执行SQL返回的结果如下:


t_order_0和t_order_1结果.png

知道实际表的返回结果后,后面的分析更容易理解;假定这些返回结果用json表示为:{[{"status":"NEW", "count_user_id":1},{"status":"VALID", "count_user_id":1},{"status":INIT, "count_user_id":2}],[{"status":"VALID", "count_user_id":1},{"status":"INIT", "count_user_id":1},{"status":""NEW, "count_user_id":3}]}

init()方法源码如下:

private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {
    Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024);
    Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024);
    // 遍历多个实际表执行返回的结果集合中所有的结果,即2个实际表每个实际表3条结果,总计6条结果
    for (ResultSet each : resultSets) {
        while (each.next()) {
            // each就是遍历过程中的一条结果,selectStatement.getGroupByItems()即group by项,即status,将结果和group by项组成一个GroupByValue对象--实际是从ResultSet中取出group by项的值,例如NEW,VALID,INIT等
            GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());
            // initForFirstGroupByValue()分析如下
            initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);
            aggregate(each, groupByValue, aggregationMap);
        }
    }
    // 将aggregationMap中的聚合计算结果封装到dataMap中
    setAggregationValueToMemoryRow(dataMap, aggregationMap);
    // 将结果转换成List<MemoryResultSetRow>形式
    List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);
    if (!result.isEmpty()) {
        // 如果有结果,再将currentResultSetRow置为List<MemoryResultSetRow>的第一个元素
        setCurrentResultSetRow(result.get(0));
    }
    // 返回List<MemoryResultSetRow>的迭代器,后面的取结果,实际上就是迭代这个集合;
    return result.iterator();
}   

initForFirstGroupByValue()源码如下:

private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap, 
                                      final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {
    // groupByValue如果是第一次出现,那么在dataMap中初始化一条数据,key就是groupByValue,例如NEW;value就是new MemoryResultSetRow(resultSet),即将ResultSet中的结果取出来封装到MemoryResultSetRow中,MemoryResultSetRow实际就一个属性Object[] data,那么data值就是这样的["NEW", 1]                              
    if (!dataMap.containsKey(groupByValue)) {
        dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));
    }
    // groupByValue如果是第一次出现,那么在aggregationMap中初始化一条数据,key就是groupByValue,例如NEW;value又是一个map,这个map的key就是select中有聚合计算的列,例如count(user_id),即count_user_id;value就是AggregationUnit的实现,count聚合计算的实现是AccumulationAggregationUnit
    if (!aggregationMap.containsKey(groupByValue)) {
        Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
            @Override
            public AggregationUnit apply(final AggregationSelectItem input) {
                // 根据聚合计算类型得到AggregationUnit的实现
                return AggregationUnitFactory.create(input.getType());
            }
        });
        aggregationMap.put(groupByValue, map);
    }
}

该方法都是为了接下来的聚合计算做准备工作;

aggregate()源码如下--即在内存中将多个实际表中返回的结果进行聚合:

private void aggregate(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {
    // 遍历select中所有的聚合类型,例如COUNT(o.user_id)
    for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) {
        List<Comparable<?>> values = new ArrayList<>(2);
        if (each.getDerivedAggregationSelectItems().isEmpty()) {
            values.add(getAggregationValue(resultSet, each));
        } else {
            for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
                values.add(getAggregationValue(resultSet, derived));
            }
        }
        // 通过AggregationUnit实现类即AccumulationAggregationUnit进行聚合,实际上就是聚合本次遍历到的ResultSet,聚合的临时结果就在AccumulationAggregationUnit的属性result中(AccumulationAggregationUnit聚合的本质就是累加)
        aggregationMap.get(groupByValue).get(each).merge(values);
    }
}

经过for (ResultSet each : resultSets) { while (each.next()) { ... 遍历所有结果并聚合计算后,aggregationMap这个map中已经聚合计算完后的结果,如下所示:

{
    "VALID": {
        "COUNT(user_id)": 2
    },
    "INIT": {
        "COUNT(user_id)": 5
    },
    "NEW": {
        "COUNT(user_id)": 3
    }
}

再将aggregationMap中的结果封装到Map<GroupByValue, MemoryResultSetRow> dataMap这个map中,结果形式如下所示:

{
    "VALID": ["VALID", 2],
    "INIT": ["INIT", 5],
    "NEW": ["NEW", 3]
}

MemoryResultSetRow的本质就是一个Object[] data,所以其值是["VALID", 2],["INIT", 5]这种形式

将结果转成List<MemoryResultSetRow>,并且排序--如果有order by,那么根据order by的值进行排序,否则根据group by的值排序:

private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) {
    List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values());
    Collections.sort(result, new GroupByRowComparator(selectStatement));
    return result;
}
@RequiredArgsConstructor
public final class GroupByRowComparator implements Comparator<MemoryResultSetRow> {
    
    private final SelectStatement selectStatement;
    
    @Override
    public int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2) {
        if (!selectStatement.getOrderByItems().isEmpty()) {
            return compare(o1, o2, selectStatement.getOrderByItems());
        }
        return compare(o1, o2, selectStatement.getGroupByItems());
    }
    ...
}   

到这里,GroupByMemoryResultSetMerger即内存GROUP聚合计算已经分析完成,依旧通过运行过程图解加深对GroupByMemoryResultSetMerger的理解,运行过程图如下图所示:

image.png

image.png

总结

正如GroupByMemoryResultSetMerger的名字一样,其实现原理是把所有结果加载到内存中,在内存中进行计算,而GroupByMemoryResultSetMerger是流式计算方法,并不需要加载所有实际表返回的结果到内存中。这样的话,如果SQL返回的总结果数比较多,GroupByMemoryResultSetMerger的处理方式就可能会撑爆内存;这个是使用sharding-jdbc一个非常需要注意的地方;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容

  • 阿飞Javaer,转载请注明原创出处,谢谢! 在5. sharding-jdbc源码之结果合并中已经分析了Orde...
    阿飞的博客阅读 2,728评论 2 2
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 一. Java基础部分.................................................
    wy_sure阅读 3,807评论 0 11
  • 复杂设计的含义 在习惯了密斯的金科玉律 “ 少即是多 ” 后,设计师对 “ 复杂 ” 噤若寒蝉,普遍认为复杂的设计...
    走过路过的路人甲阅读 486评论 0 0
  • 新的一天从晨起发愿开始: 今天,愿所有见我面的,听我声的,闻我名的所有众生,都因我而喜悦祥和,心得清净,少...
    小太阳Monica阅读 238评论 0 2