Spark 2.3.0测试笔记一:Shuffle到胃疼

1 前言

最近有想法将生产用的spark2.1.x版本的基线上调至spark2.3系列,在此基础上维护一个相对稳定的生产版本,以满足用户日益增长的新特性和性能的需求。

2 测试数据

benchmark scale fileformat partitioned link
TPCDS 1T parquet true https://github.com/yaooqinn/tpcds-for-spark

3 实验对象

3.1 参照组

baseline modified commit link
2.1.2 true 9ef23ae https://github.com/yaooqinn/spark/tree/v2.1.2-based

3.2 实验组

baseline modified commit link
2.3.0 false 992447f https://github.com/yaooqinn/spark/tree/v2.3.0-based

4 配置

4.1 硬件配置

机器类别 CPU Memory Disk 台数 Services
虚拟机 4 × 1 × 1 32g 500g × 1 5 NN(1) SNN (1) RM(1) HMS(1) SHS(1)
物理机 48 × 2 × 12 256g 7.3T × 12 3 DN(3) NM(3)
物理机 48 × 2× 12 256g 1.1T × 1 4 DN(4) NM(4)
物理机 32 × 2 × 8 128g 3.6T × 12 1 DN(1) NM(1)
物理机 40 × 2 × 10 256g 500g × 1 1 NM(1)
物理机 48 × 2 × 12 32g 1.1T × 1 1 DN(1)

说明:复杂的机器配置,模拟更加真实的场景,在一些单盘物理机上进行shuffle亦可模拟生产环境中可能的IO瓶颈,总之这些场景不影响总体的测试对比。

4.2 metrics

hdfs
yarn

4.3 SparkConf

## Basic Settings ##
spark.master                                          yarn
spark.submit.deployMode                               client
spark.serializer                                      org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max                       256m
spark.local.dir                                       ./local

## Hadoop Settings ##
spark.hadoop.fs.hdfs.impl.disable.cache               true
spark.hadoop.fs.file.impl.disable.cache               true

## Driver/AM Settings ##
spark.yarn.am.cores                                   2
spark.yarn.am.memory                                  2g
spark.yarn.am.memoryOverhead                          512
spark.yarn.am.extraJavaOptions                        -XX:PermSize=1024m -XX:MaxPermSize=2048m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
spark.driver.maxResultSize                            2g
spark.driver.memory                                   30g
spark.driver.extraJavaOptions                         -XX:PermSize=1024m -XX:MaxPermSize=1024m

## Executor Settings ##
spark.executor.instances                              40
spark.executor.cores                                  4
spark.executor.memory                                 20g
spark.executor.memoryOverhead                         4096
spark.executor.extraJavaOptions                       -XX:PermSize=1024m -XX:MaxPermSize=1024m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution

## Security Settings ##
spark.yarn.keytab                                     ?
spark.yarn.principal                                  ?

## Dynamic Allocation Settings ##
spark.shuffle.service.enabled                         true
spark.dynamicAllocation.enabled                       false
spark.dynamicAllocation.initialExecutors              1
spark.dynamicAllocation.minExecutors                  1
spark.dynamicAllocation.maxExecutors                  50

## SQL Configurations ##
spark.sql.autoBroadcastJoinThreshold                  204857600
spark.sql.warehouse.dir                               /user/spark/warehouse
# spark.sql.hive.convertCTAS                            true
# spark.sql.sources.default                             parquet
spark.sql.shuffle.partitions                          1024
# spark.sql.hive.convertMetastoreParquet                false
spark.sql.crossJoin.enabled=true

## History Server Client Settings ##
# spark.eventLog.enabled                                true
spark.eventLog.compress                               true
spark.eventLog.dir                                    hdfs:///spark2-history/
spark.yarn.historyServer.address                      ?:18081

5 测试结果

5.1 实验数据

测试结果
  1. 列1:sql statement
  2. 列2:参照组结果
  3. 列3:实验组结果
  4. 列4:(实验组 - 参照组) / 参照组
  5. 绿色:列4 < 0, 性能提升
  6. 粉红:0<=列4 < 1,性能下降
  7. 深红:列4 > 1, 性能下下下降

5.2 定性结果

  1. 磁盘IO相关的告警,用量,负荷等等。单盘物理机在大shuffle的情况下,负载过重会出现相关的告警;参照组只在query14等个别查询下产生相关告警,实验组一晚上收到的告警让人无法安眠。
  2. 实验组query72为手动kill,shuffle了一天也没有出结果,由于出现数据倾斜的情况,在spark.sql.shuffle.partitions=1024情况下,只有10个左右task在计算结果,单task处理T级别数据,几乎卡死,其余全在毫秒级空转完成。参照组400多秒完成查询。
  3. 出现大面积的性能下降问题,可能的原因有很多,可以确定的一点,有这么多shuffle,Broadcast Join应该变成了SortMerge Join

6 相关分析

我们拿query72来进行分析

6.1 参照组执行计划

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[total_cnt#3L DESC NULLS LAST,i_item_desc#62 ASC NULLS FIRST,w_warehouse_name#46 ASC NULLS FIRST,d_week_seq#98L ASC NULLS FIRST], output=[i_item_desc#62,w_warehouse_name#46,d_week_seq#98L,no_promo#1L,promo#2L,total_cnt#3L])
+- *HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), count(1)])
   +- Exchange hashpartitioning(i_item_desc#62, w_warehouse_name#46, d_week_seq#98L, 1024)
      +- *HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[partial_sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_count(1)])
         +- *Project [w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
            +- SortMergeJoin [cs_item_sk#21L, cs_order_number#23L], [cr_item_sk#199L, cr_order_number#214L], LeftOuter
               :- *Sort [cs_item_sk#21L ASC NULLS FIRST, cs_order_number#23L ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cs_item_sk#21L, cs_order_number#23L, 1024)
               :     +- *Project [cs_item_sk#21L, cs_order_number#23L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
               :        +- *BroadcastHashJoin [cs_promo_sk#22L], [p_promo_sk#178L], LeftOuter, BuildRight
               :           :- *Project [cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L]
               :           :  +- *BroadcastHashJoin [cs_ship_date_sk#8L], [d_date_sk#150L], Inner, BuildRight, (cast(d_date#152 as double) > (cast(d_date#96 as double) + 5.0))
               :           :     :- *Project [cs_ship_date_sk#8L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :     :  +- *BroadcastHashJoin [d_week_seq#98L, inv_date_sk#40L], [d_week_seq#126L, d_date_sk#122L], Inner, BuildRight
               :           :     :     :- *Project [cs_ship_date_sk#8L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :     :     :  +- *BroadcastHashJoin [cs_sold_date_sk#6L], [d_date_sk#94L], Inner, BuildRight
               :           :     :     :     :- *Project [cs_ship_date_sk#8L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62]
               :           :     :     :     :  +- *BroadcastHashJoin [cs_bill_hdemo_sk#11L], [hd_demo_sk#89L], Inner, BuildRight
               :           :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62]
               :           :     :     :     :     :  +- *BroadcastHashJoin [cs_bill_cdemo_sk#10L], [cd_demo_sk#80L], Inner, BuildRight
               :           :     :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62]
               :           :     :     :     :     :     :  +- *BroadcastHashJoin [cs_item_sk#21L], [i_item_sk#58L], Inner, BuildRight
               :           :     :     :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46]
               :           :     :     :     :     :     :     :  +- *BroadcastHashJoin [inv_warehouse_sk#42L], [w_warehouse_sk#44L], Inner, BuildRight
               :           :     :     :     :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_warehouse_sk#42L, inv_date_sk#40L]
               :           :     :     :     :     :     :     :     :  +- *SortMergeJoin [cs_item_sk#21L], [inv_item_sk#41L], Inner, (inv_quantity_on_hand#43L < cs_quantity#24L)
               :           :     :     :     :     :     :     :     :     :- *Sort [cs_item_sk#21L ASC NULLS FIRST], false, 0
               :           :     :     :     :     :     :     :     :     :  +- Exchange hashpartitioning(cs_item_sk#21L, 1024)
               :           :     :     :     :     :     :     :     :     :     +- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_quantity#24L, cs_sold_date_sk#6L]
               :           :     :     :     :     :     :     :     :     :        +- *Filter ((((isnotnull(cs_quantity#24L) && isnotnull(cs_item_sk#21L)) && isnotnull(cs_bill_cdemo_sk#10L)) && isnotnull(cs_bill_hdemo_sk#11L)) && isnotnull(cs_ship_date_sk#8L))
               :           :     :     :     :     :     :     :     :     :           +- *FileScan parquet tpcds_1t_spark230.catalog_sales[cs_ship_date_sk#8L,cs_bill_cdemo_sk#10L,cs_bill_hdemo_sk#11L,cs_item_sk#21L,cs_promo_sk#22L,cs_order_number#23L,cs_quantity#24L,cs_sold_date_sk#6L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 1836, PartitionFilters: [isnotnull(cs_sold_date_sk#6L)], PushedFilters: [IsNotNull(cs_quantity), IsNotNull(cs_item_sk), IsNotNull(cs_bill_cdemo_sk), IsNotNull(cs_bill_hd..., ReadSchema: struct<cs_ship_date_sk:bigint,cs_bill_cdemo_sk:bigint,cs_bill_hdemo_sk:bigint,cs_item_sk:bigint,c...
               :           :     :     :     :     :     :     :     :     +- *Sort [inv_item_sk#41L ASC NULLS FIRST], false, 0
               :           :     :     :     :     :     :     :     :        +- Exchange hashpartitioning(inv_item_sk#41L, 1024)
               :           :     :     :     :     :     :     :     :           +- *Project [inv_item_sk#41L, inv_warehouse_sk#42L, inv_quantity_on_hand#43L, inv_date_sk#40L]
               :           :     :     :     :     :     :     :     :              +- *Filter ((isnotnull(inv_quantity_on_hand#43L) && isnotnull(inv_item_sk#41L)) && isnotnull(inv_warehouse_sk#42L))
               :           :     :     :     :     :     :     :     :                 +- *FileScan parquet tpcds_1t_spark230.inventory[inv_item_sk#41L,inv_warehouse_sk#42L,inv_quantity_on_hand#43L,inv_date_sk#40L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 261, PartitionFilters: [isnotnull(inv_date_sk#40L)], PushedFilters: [IsNotNull(inv_quantity_on_hand), IsNotNull(inv_item_sk), IsNotNull(inv_warehouse_sk)], ReadSchema: struct<inv_item_sk:bigint,inv_warehouse_sk:bigint,inv_quantity_on_hand:bigint>
               :           :     :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :     :     :     :        +- *Project [w_warehouse_sk#44L, w_warehouse_name#46]
               :           :     :     :     :     :     :     :           +- *Filter isnotnull(w_warehouse_sk#44L)
               :           :     :     :     :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.warehouse[w_warehouse_sk#44L,w_warehouse_name#46] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(w_warehouse_sk)], ReadSchema: struct<w_warehouse_sk:bigint,w_warehouse_name:string>
               :           :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :     :     :        +- *Project [i_item_sk#58L, i_item_desc#62]
               :           :     :     :     :     :     :           +- *Filter isnotnull(i_item_sk#58L)
               :           :     :     :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.item[i_item_sk#58L,i_item_desc#62] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_item_desc:string>
               :           :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :     :        +- *Project [cd_demo_sk#80L]
               :           :     :     :     :     :           +- *Filter ((isnotnull(cd_marital_status#82) && (cd_marital_status#82 = M)) && isnotnull(cd_demo_sk#80L))
               :           :     :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.customer_demographics[cd_demo_sk#80L,cd_marital_status#82] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_marital_status), EqualTo(cd_marital_status,M), IsNotNull(cd_demo_sk)], ReadSchema: struct<cd_demo_sk:bigint,cd_marital_status:string>
               :           :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :        +- *Project [hd_demo_sk#89L]
               :           :     :     :     :           +- *Filter ((isnotnull(hd_buy_potential#91) && (hd_buy_potential#91 = 1001-5000)) && isnotnull(hd_demo_sk#89L))
               :           :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.household_demographics[hd_demo_sk#89L,hd_buy_potential#91] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(hd_buy_potential), EqualTo(hd_buy_potential,1001-5000), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:bigint,hd_buy_potential:string>
               :           :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :        +- *Project [d_date_sk#94L, d_date#96, d_week_seq#98L]
               :           :     :     :           +- *Filter ((((isnotnull(d_year#100L) && (d_year#100L = 2001)) && isnotnull(d_date_sk#94L)) && isnotnull(d_week_seq#98L)) && isnotnull(d_date#96))
               :           :     :     :              +- *FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#94L,d_date#96,d_week_seq#98L,d_year#100L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk), IsNotNull(d_week_seq), IsNotNull(..., ReadSchema: struct<d_date_sk:bigint,d_date:string,d_week_seq:bigint,d_year:bigint>
               :           :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true], input[0, bigint, true]))
               :           :     :        +- *Project [d_date_sk#122L, d_week_seq#126L]
               :           :     :           +- *Filter (isnotnull(d_date_sk#122L) && isnotnull(d_week_seq#126L))
               :           :     :              +- *FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#122L,d_week_seq#126L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk), IsNotNull(d_week_seq)], ReadSchema: struct<d_date_sk:bigint,d_week_seq:bigint>
               :           :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :        +- *Project [d_date_sk#150L, d_date#152]
               :           :           +- *Filter (isnotnull(d_date#152) && isnotnull(d_date_sk#150L))
               :           :              +- *FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#150L,d_date#152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_date), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_date:string>
               :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :              +- *FileScan parquet tpcds_1t_spark230.promotion[p_promo_sk#178L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<p_promo_sk:bigint>
               +- *Sort [cr_item_sk#199L ASC NULLS FIRST, cr_order_number#214L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(cr_item_sk#199L, cr_order_number#214L, 1024)
                     +- *Project [cr_item_sk#199L, cr_order_number#214L]
                        +- *FileScan parquet tpcds_1t_spark230.catalog_returns[cr_item_sk#199L,cr_order_number#214L,cr_returned_date_sk#197L] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark23..., PartitionCount: 2104, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cr_item_sk:bigint,cr_order_number:bigint>

6.2 实验组执行计划

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[total_cnt#2L DESC NULLS LAST,i_item_desc#62 ASC NULLS FIRST,w_warehouse_name#46 ASC NULLS FIRST,d_week_seq#98L ASC NULLS FIRST], output=[i_item_desc#62,w_warehouse_name#46,d_week_seq#98L,no_promo#0L,promo#1L,total_cnt#2L])
+- *(42) HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), count(1)])
   +- Exchange hashpartitioning(i_item_desc#62, w_warehouse_name#46, d_week_seq#98L, 1024)
      +- *(41) HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[partial_sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_count(1)])
         +- *(41) Project [w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
            +- SortMergeJoin [cs_item_sk#20L, cs_order_number#22L], [cr_item_sk#198L, cr_order_number#213L], LeftOuter
               :- *(38) Sort [cs_item_sk#20L ASC NULLS FIRST, cs_order_number#22L ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cs_item_sk#20L, cs_order_number#22L, 1024)
               :     +- *(37) Project [cs_item_sk#20L, cs_order_number#22L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
               :        +- SortMergeJoin [cs_promo_sk#21L], [p_promo_sk#178L], LeftOuter
               :           :- *(34) Sort [cs_promo_sk#21L ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(cs_promo_sk#21L, 1024)
               :           :     +- *(33) Project [cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L]
               :           :        +- *(33) SortMergeJoin [cs_ship_date_sk#7L], [d_date_sk#150L], Inner, (cast(d_date#152 as double) > (cast(d_date#96 as double) + 5.0))
               :           :           :- *(30) Sort [cs_ship_date_sk#7L ASC NULLS FIRST], false, 0
               :           :           :  +- Exchange hashpartitioning(cs_ship_date_sk#7L, 1024)
               :           :           :     +- *(29) Project [cs_ship_date_sk#7L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :           :        +- *(29) SortMergeJoin [d_week_seq#98L, inv_date_sk#43L], [d_week_seq#126L, d_date_sk#122L], Inner
               :           :           :           :- *(26) Sort [d_week_seq#98L ASC NULLS FIRST, inv_date_sk#43L ASC NULLS FIRST], false, 0
               :           :           :           :  +- Exchange hashpartitioning(d_week_seq#98L, inv_date_sk#43L, 1024)
               :           :           :           :     +- *(25) Project [cs_ship_date_sk#7L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :           :           :        +- *(25) SortMergeJoin [cs_sold_date_sk#39L], [d_date_sk#94L], Inner
               :           :           :           :           :- *(22) Sort [cs_sold_date_sk#39L ASC NULLS FIRST], false, 0
               :           :           :           :           :  +- Exchange hashpartitioning(cs_sold_date_sk#39L, 1024)
               :           :           :           :           :     +- *(21) Project [cs_ship_date_sk#7L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62]
               :           :           :           :           :        +- *(21) SortMergeJoin [cs_bill_hdemo_sk#10L], [hd_demo_sk#89L], Inner
               :           :           :           :           :           :- *(18) Sort [cs_bill_hdemo_sk#10L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :  +- Exchange hashpartitioning(cs_bill_hdemo_sk#10L, 1024)
               :           :           :           :           :           :     +- *(17) Project [cs_ship_date_sk#7L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62]
               :           :           :           :           :           :        +- *(17) SortMergeJoin [cs_bill_cdemo_sk#9L], [cd_demo_sk#80L], Inner
               :           :           :           :           :           :           :- *(14) Sort [cs_bill_cdemo_sk#9L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :  +- Exchange hashpartitioning(cs_bill_cdemo_sk#9L, 1024)
               :           :           :           :           :           :           :     +- *(13) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62]
               :           :           :           :           :           :           :        +- *(13) SortMergeJoin [cs_item_sk#20L], [i_item_sk#58L], Inner
               :           :           :           :           :           :           :           :- *(10) Sort [cs_item_sk#20L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :  +- Exchange hashpartitioning(cs_item_sk#20L, 1024)
               :           :           :           :           :           :           :           :     +- *(9) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46]
               :           :           :           :           :           :           :           :        +- *(9) SortMergeJoin [inv_warehouse_sk#41L], [w_warehouse_sk#44L], Inner
               :           :           :           :           :           :           :           :           :- *(6) Sort [inv_warehouse_sk#41L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :           :  +- Exchange hashpartitioning(inv_warehouse_sk#41L, 1024)
               :           :           :           :           :           :           :           :           :     +- *(5) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_warehouse_sk#41L, inv_date_sk#43L]
               :           :           :           :           :           :           :           :           :        +- *(5) SortMergeJoin [cs_item_sk#20L], [inv_item_sk#40L], Inner, (inv_quantity_on_hand#42L < cs_quantity#23L)
               :           :           :           :           :           :           :           :           :           :- *(2) Sort [cs_item_sk#20L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :           :           :  +- Exchange hashpartitioning(cs_item_sk#20L, 1024)
               :           :           :           :           :           :           :           :           :           :     +- *(1) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_quantity#23L, cs_sold_date_sk#39L]
               :           :           :           :           :           :           :           :           :           :        +- *(1) Filter ((((isnotnull(cs_item_sk#20L) && isnotnull(cs_quantity#23L)) && isnotnull(cs_bill_cdemo_sk#9L)) && isnotnull(cs_bill_hdemo_sk#10L)) && isnotnull(cs_ship_date_sk#7L))
               :           :           :           :           :           :           :           :           :           :           +- *(1) FileScan parquet tpcds_1t_spark230.catalog_sales[cs_ship_date_sk#7L,cs_bill_cdemo_sk#9L,cs_bill_hdemo_sk#10L,cs_item_sk#20L,cs_promo_sk#21L,cs_order_number#22L,cs_quantity#23L,cs_sold_date_sk#39L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 1836, PartitionFilters: [isnotnull(cs_sold_date_sk#39L)], PushedFilters: [IsNotNull(cs_item_sk), IsNotNull(cs_quantity), IsNotNull(cs_bill_cdemo_sk), IsNotNull(cs_bill_hd..., ReadSchema: struct<cs_ship_date_sk:bigint,cs_bill_cdemo_sk:bigint,cs_bill_hdemo_sk:bigint,cs_item_sk:bigint,c...
               :           :           :           :           :           :           :           :           :           +- *(4) Sort [inv_item_sk#40L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :           :              +- Exchange hashpartitioning(inv_item_sk#40L, 1024)
               :           :           :           :           :           :           :           :           :                 +- *(3) Project [inv_item_sk#40L, inv_warehouse_sk#41L, inv_quantity_on_hand#42L, inv_date_sk#43L]
               :           :           :           :           :           :           :           :           :                    +- *(3) Filter ((isnotnull(inv_item_sk#40L) && isnotnull(inv_quantity_on_hand#42L)) && isnotnull(inv_warehouse_sk#41L))
               :           :           :           :           :           :           :           :           :                       +- *(3) FileScan parquet tpcds_1t_spark230.inventory[inv_item_sk#40L,inv_warehouse_sk#41L,inv_quantity_on_hand#42L,inv_date_sk#43L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 261, PartitionFilters: [isnotnull(inv_date_sk#43L)], PushedFilters: [IsNotNull(inv_item_sk), IsNotNull(inv_quantity_on_hand), IsNotNull(inv_warehouse_sk)], ReadSchema: struct<inv_item_sk:bigint,inv_warehouse_sk:bigint,inv_quantity_on_hand:bigint>
               :           :           :           :           :           :           :           :           +- *(8) Sort [w_warehouse_sk#44L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :              +- Exchange hashpartitioning(w_warehouse_sk#44L, 1024)
               :           :           :           :           :           :           :           :                 +- *(7) Project [w_warehouse_sk#44L, w_warehouse_name#46]
               :           :           :           :           :           :           :           :                    +- *(7) Filter isnotnull(w_warehouse_sk#44L)
               :           :           :           :           :           :           :           :                       +- *(7) FileScan parquet tpcds_1t_spark230.warehouse[w_warehouse_sk#44L,w_warehouse_name#46] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(w_warehouse_sk)], ReadSchema: struct<w_warehouse_sk:bigint,w_warehouse_name:string>
               :           :           :           :           :           :           :           +- *(12) Sort [i_item_sk#58L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :              +- Exchange hashpartitioning(i_item_sk#58L, 1024)
               :           :           :           :           :           :           :                 +- *(11) Project [i_item_sk#58L, i_item_desc#62]
               :           :           :           :           :           :           :                    +- *(11) Filter isnotnull(i_item_sk#58L)
               :           :           :           :           :           :           :                       +- *(11) FileScan parquet tpcds_1t_spark230.item[i_item_sk#58L,i_item_desc#62] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_item_desc:string>
               :           :           :           :           :           :           +- *(16) Sort [cd_demo_sk#80L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :              +- Exchange hashpartitioning(cd_demo_sk#80L, 1024)
               :           :           :           :           :           :                 +- *(15) Project [cd_demo_sk#80L]
               :           :           :           :           :           :                    +- *(15) Filter ((isnotnull(cd_marital_status#82) && (cd_marital_status#82 = M)) && isnotnull(cd_demo_sk#80L))
               :           :           :           :           :           :                       +- *(15) FileScan parquet tpcds_1t_spark230.customer_demographics[cd_demo_sk#80L,cd_marital_status#82] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_marital_status), EqualTo(cd_marital_status,M), IsNotNull(cd_demo_sk)], ReadSchema: struct<cd_demo_sk:bigint,cd_marital_status:string>
               :           :           :           :           :           +- *(20) Sort [hd_demo_sk#89L ASC NULLS FIRST], false, 0
               :           :           :           :           :              +- Exchange hashpartitioning(hd_demo_sk#89L, 1024)
               :           :           :           :           :                 +- *(19) Project [hd_demo_sk#89L]
               :           :           :           :           :                    +- *(19) Filter ((isnotnull(hd_buy_potential#91) && (hd_buy_potential#91 = 1001-5000)) && isnotnull(hd_demo_sk#89L))
               :           :           :           :           :                       +- *(19) FileScan parquet tpcds_1t_spark230.household_demographics[hd_demo_sk#89L,hd_buy_potential#91] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(hd_buy_potential), EqualTo(hd_buy_potential,1001-5000), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:bigint,hd_buy_potential:string>
               :           :           :           :           +- *(24) Sort [d_date_sk#94L ASC NULLS FIRST], false, 0
               :           :           :           :              +- Exchange hashpartitioning(d_date_sk#94L, 1024)
               :           :           :           :                 +- *(23) Project [d_date_sk#94L, d_date#96, d_week_seq#98L]
               :           :           :           :                    +- *(23) Filter ((((isnotnull(d_year#100L) && (d_year#100L = 2001)) && isnotnull(d_date_sk#94L)) && isnotnull(d_week_seq#98L)) && isnotnull(d_date#96))
               :           :           :           :                       +- *(23) FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#94L,d_date#96,d_week_seq#98L,d_year#100L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk), IsNotNull(d_week_seq), IsNotNull(..., ReadSchema: struct<d_date_sk:bigint,d_date:string,d_week_seq:bigint,d_year:bigint>
               :           :           :           +- *(28) Sort [d_week_seq#126L ASC NULLS FIRST, d_date_sk#122L ASC NULLS FIRST], false, 0
               :           :           :              +- Exchange hashpartitioning(d_week_seq#126L, d_date_sk#122L, 1024)
               :           :           :                 +- *(27) Project [d_date_sk#122L, d_week_seq#126L]
               :           :           :                    +- *(27) Filter (isnotnull(d_week_seq#126L) && isnotnull(d_date_sk#122L))
               :           :           :                       +- *(27) FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#122L,d_week_seq#126L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_week_seq), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_week_seq:bigint>
               :           :           +- *(32) Sort [d_date_sk#150L ASC NULLS FIRST], false, 0
               :           :              +- Exchange hashpartitioning(d_date_sk#150L, 1024)
               :           :                 +- *(31) Project [d_date_sk#150L, d_date#152]
               :           :                    +- *(31) Filter (isnotnull(d_date#152) && isnotnull(d_date_sk#150L))
               :           :                       +- *(31) FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#150L,d_date#152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_date), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_date:string>
               :           +- *(36) Sort [p_promo_sk#178L ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(p_promo_sk#178L, 1024)
               :                 +- *(35) FileScan parquet tpcds_1t_spark230.promotion[p_promo_sk#178L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<p_promo_sk:bigint>
               +- *(40) Sort [cr_item_sk#198L ASC NULLS FIRST, cr_order_number#213L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(cr_item_sk#198L, cr_order_number#213L, 1024)
                     +- *(39) Project [cr_item_sk#198L, cr_order_number#213L]
                        +- *(39) FileScan parquet tpcds_1t_spark230.catalog_returns[cr_item_sk#198L,cr_order_number#213L,cr_returned_date_sk#224L] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark23..., PartitionCount: 2104, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cr_item_sk:bigint,cr_order_number:bigint>

6.3 执行计划比较

实验组所有的Join都由SortMergeJoin完成,而参照组很多小表都得以广播。数据是同一份,参数列表也是同一份。产生截然不同的执行计划,这属于用户感知的部分,所以要么就是有文档说明,要么就是bug,要么就是社区玩忽职守。

6.4 确定行为变更

spark-1.6.3-bin-hadoop2.6
spark-2.0.2-bin-hadoop2.7
spark-2.1.0-bin-hadoop2.7
spark-2.1.1-bin-hadoop2.7     
spark-2.1.2-bin-hadoop2.7 
spark-2.2.0-bin-hadoop2.7   
spark-2.2.1-bin-hadoop2.7
spark-2.3.0-bin-hadoop2.7
spark-2.3.1-SNAPSHOT-bin-20180511
spark-2.4.0-SNAPSHOT-bin-20180511

分析了以上几个版本的spark发现该行为是从spark-2.2.0-bin-hadoop2.7版本以后发生了变化,故查阅相关的变更文档
http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-21-to-22

Spark 2.1.1 introduced a new configuration key: spark.sql.hive.caseSensitiveInferenceMode. It had a default setting of NEVER_INFER, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting’s default value to INFER_AND_SAVE to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the INFER_AND_SAVE configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set spark.sql.hive.caseSensitiveInferenceMode to NEVER_INFER to avoid the initial overhead of schema inference. Note that with the new default INFER_AND_SAVE setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table’s first access.

Since Spark 2.2.1 and 2.3.0, the schema is always inferred at runtime when the data source tables have the columns that exist in both partition schema and data schema. The inferred schema does not have the partitioned columns. When reading the table, Spark respects the partition values of these overlapping columns instead of the values stored in the data source files. In 2.2.0 and 2.1.x release, the inferred schema is partitioned but the data of the table is invisible to users (i.e., the result set is empty).

声明并不是很长,讲得是类型推倒的问题,基本确定月本问题没有关联。难道正是BUG?

6.5 Debuging

首先,spark判断build表能否broadcast很简单,不过就是通过大小比较,在代码逻辑中的表征就是Statistics信息,所有的Statistics信息都是从叶子节点LeafNode开始计算的,在没有CBO的场景下就更加简单,比如Filter不能确定你到底过滤多少数据就算成child节点的值,比如Join不知道结果发散收敛就拿两表的乘积来定,所以根据逻辑计划很容易找到代码:

class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    case relation: HiveTableRelation
        if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      val table = relation.tableMeta
      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
        try {
          val hadoopConf = session.sessionState.newHadoopConf()
          val tablePath = new Path(table.location)
          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
          fs.getContentSummary(tablePath).getLength
        } catch {
          case e: IOException =>
            logWarning("Failed to get table size from hdfs.", e)
            session.sessionState.conf.defaultSizeInBytes
        }
      } else {
        session.sessionState.conf.defaultSizeInBytes
      }

      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
      relation.copy(tableMeta = withStats)
  }
}
  1. 发现fallBackToHdfsForStatsEnabled(spark.sql.statistics.fallBackToHdfs)这个参数,该参数目前所有版本缺省值一致,测试中也未有进行篡改。
  2. 发现2.2.0及后续版本设置该参数spark.sql.statistics.fallBackToHdfs=true后可广播小表
  3. 进一步debug也确定Statistics信息生成正确,各种Relation转换的时候该信息也没有丢,所以不是BUG
  4. 行为变化,没有声明,也不是BUG,那就是社区玩忽职守了。

P.S. 从这个参数的字面释义上来讲,其实讲2.20之前的版本为bug也不为过,将错得错,错错得对了吧。

7 总结

  1. 性能下降问题基本可以通过设置spark.sql.statistics.fallBackToHdfs=true来规避大部分也可能是所有,看后续新一轮的跑分而定。如果有新的问题,继续同步。
  2. Spark社区对于Hive的暧昧由来已久,对于Hive table的支持也就随之变换来去,出现玩忽职守的状况也可以预见。Spark On Hive的代码乱七八糟的,曾经我也改过蛮多这块的代码的,发现基本上所有的committer对这块的把控也是相对较弱的,可能终有一天是弃子的缘故吧。就不说spark thrift server模块的烂的,有兴趣的可移步Kyuubi,一定给你更好的体验。
  3. Spark 2.3.0上生产基本上可以否决了,除了这个一个参数就有可能可以搞定的问题,还有一个致命的问题:
    https://issues-test.apache.org/jira/browse/SPARK-23852
    还是尽请期待2.3.1吧,社区已经在讨论了,估计上面这个blocker解决了,就提上日程了。

8 后记

春眠不觉晓
处处手机闹
夜来警报声
磁盘爆多少
Shuffle到胃疼
自挂东南枝

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容