8027后关于DDL 丢数据问题

一、问题由来

首先说明影响范围8027-最新版本都有,主要是DDL并发的出现引入的BUG,触发概率和当前系统并发的update/delete压力有关,本问题和是否调整并发度无关,测试使用全部单线程。
这个问题最早是Rex给我的,且分析了游标恢复的问题,丁奇老师也发布了这个问题,最近percona也修复了这个问题,

这里涉及到2个BUG如下,

二、问题模拟

这两个BUG我们都通过percona的方式模拟出来如下(测试版本8036),首先排除并行DDL的影响,设置如下参数,

  • innodb_parallel_read_threads=1 :并行DDL读取阶段使用1个线程,其实插入主键表也是一个线程。
  • innodb_ddl_threads =1:并行DDL 二级索引重组阶段,比如排序,归并等操作使用一个线程
  • innodb_purge_threads = 1:一个purge线程,清理del flag和undo
2.1 丢数据(debug版本)
准备数据:
CREATE TABLE t1 (pk CHAR(5) PRIMARY KEY);
INSERT INTO t1 VALUES ('aaaaa'), ('bbbbb'), ('bbbcc'), ('ccccc'), ('ddddd'), ('eeeee');
S1 S2
这一步的目的是2行数据key buffer就满
SET DEBUG='+d,ddl_buf_add_two';
set global innodb_purge_stop_now=ON;
DELETE FROM t1 WHERE pk = 'bbbcc';
进行DDL,并且来到ddl0par-scan.cc:238 行
ALTER TABLE t1 ENGINE=InnoDB, ALGORITHM=INPLACE
SET GLOBAL innodb_purge_run_now=ON;
DDL继续进程(丢数据)

测试结果如下,


3289e6e89dd438943000db167118f88.png
2.2 重复记录(debug版本)
准备数据:
CREATE TABLE t1 (pk CHAR(5) PRIMARY KEY);
INSERT INTO t1 VALUES ('aaaaa'), ('bbbbb'), ('ccccc'), ('ddddd'), ('eeeee');
S1 S2
这一步的目的是2行数据key buffer就满
SET DEBUG='+d,ddl_buf_add_two';
进行DDL,并且来到ddl0par-scan.cc:238 行
ALTER TABLE t1 ENGINE=InnoDB, ALGORITHM=INPLACE
begin;INSERT INTO t1 VALUES ('bbbcc');
DDL继续 重复行报错
98867a2b9051f4923df642e1556dbea.png
2.3 使用inplace增加字段测试

测试使用2.1的方式,语句改为ALTER TABLE t1 add col1 int ,ALGORITHM=INPLACE;

image.png

可以发现数据是丢失了一行。

通常报错我们是可以接受的但是丢数据是个问题。

三、inpalce数据读取和索引重建

这个BUG存在于inplace 需要扫描数据的DDL算法中,而不会存在于copy和instant算法中。在进行inplace并行DDL的时候实际上分为数据读取和数据重建两部分,这两部分并行度是分开分别由innodb_parallel_read_threads和innodb_ddl_threads 控制的,即便我们将他们全部改为1也是不能避免跑并行部分代码的,下面我们抛开多线程并发,就谈单线程读取和加载,我们分2部分解释

3.1 数据读取

数据读取阶段会根据DDL语句来判定到底有多少索引需要重建,并且每个索引分配一个builder ,一个原则就是如果更改了主键且是inplace算法,则需要重建主键,并且所有索引都需要重建,且会生成一个临时的#sql-ibXXX数据文件,如下


image.png

如果只是增加索引,这种不会重建主键,也不会生#sql-ibXXX数据文件。
如果是需要重建主键的方式,则会读取现有主键数据,然后放入一个主键的key_buffer中,如果key_buffer满了就直接插入到新的#sql-ibXXX数据文件中,因为这些数据本来就是按照主键排序好的,然后释放key_buffer,读取的主键的时候同时会将数据存入到二级索引的key_buffer中,但是二级索引肯定是没有排序的,因此二级索引key buffer满了则不能直接插入到#sql-ibXXX数据文件,而是需要放入临时文件中,临时文件可见如下,

 connectio 1479 1552   mysql   49u      REG               8,16   10670080    3694366 /pxc/mysql8036/tmp/#3694366 (deleted)

这样直到数据扫描完成。
而如果只是增加索引,则只是读取主键数据,读取到key_buffer,如果满了则直接写临时文件,最后来数据重建就可以了。

3.2 数据重建

这部分主要是通过阶段对临时文件进行排序,重建索引等,如下


image.png

因为这部分不是主要问题,因此这里可以留下以后在学习。

四、问题部分

BUG的核心问题就存在于如果涉及到需要用inplace算法重建主键索引的DDL语句,就需要再key buffer满了过后直接插入到#sql-ibXXX数据文件中,这个时候可能正在page的中间的某个位置,插入的时候会暂时放弃page上mutex,并且保存游标到持久游标,然后插入数据,插入完成后再从持久游标恢复游标,这样做的目的可能是为了提高page修改的并发,但是这里保存和恢复持久游标却出了问题,主要是page中的数据可能出现修改,这种修改对应了前面的2个BUG,

  • purge线程,清理del flag。
  • 其他线程insert了数据。

具体游标的保存和恢复出现的问题,可以参考Rex的文章。也就是说恢复后游标指向了一条错误的数据,那么就可能导致数据丢失或者数据重复,其核心就是如下部分,


image.png

五、规避方式

当前来看,问题主要是在高并发的update/delete下进行风险DDL可能出现问题,主要是丢数据的问题,报错还可以接受,其主要影响的是inplace 需要重建主键的DDL,比如,

  • alter table engine=innodb(涉及到主键重建)
  • alter table add col int,ALGORITHM=INPLACE(因为涉及到主键修改需要重建)
  • alter table add col int after **(8027/8028 涉及到主键修改需要重建)
  • alter table add/drop col(如果instant version版本大于64需要重建的时候)

那么从上面的分析,我们可以最大化的规避方式如下,

  • 加大key_buffer,也就是innodb_ddl_buffer_size,默认为1M,如果我们加到128M,则概率下降为原来的1/128,甚至小表根本就不会触发了,极大降低触发概率。并且存在主键扫描的是不能用到并行读取的(Parallel scan will break the order)。(innodb_ddl_buffer_size/(索引的个数*并发读取线程个数))
  • 在DDL 开始前可以如下做一个oldest read view,来避免purge 线程purge记录,但是这种情况下,如果还有老的purge任务是不行的,因此需要观察show engine中 purge线程的状态,如下
s1 s2
set transaction_isolation='repeatable-read';
随便找个空表
begin;select * from test
不提交
show engine innodb status;
观察purge线程已经处于idle状态
执行风险DDL

这个方式测试不会重现丢数据的问题,但是重复主键的场景还是存在的,规避丢数BUG,重复BUG无法规避。

  • 当然就是确认低峰期执行风险DDL,降低触发概率。
  • 如果能用instant算法就用instant算法,风险DDL用锁表的方式copy来执行,完全规避。
  • 使用pt-online-schema-change工具执行风险DDL,完全规避。

最后这个BUG在8.0.41会修复,


afc2c286640af54165b45700d8a9794.png

percona已经修复,


image.png

六、其他备份

8036


ddl::Builder ---> ddl::Context &m_ctx 
                    --->Loader &m_loader;                                 
                          loader
                         


ddl::Builder::Thread_ctx
Parallel_reader::Ctx.m_thread_ctx.m_pcursor
Parallel_reader::Ctx.Scan_ctx 包含扫描的回调函数


#0  ddl::Context::scan_buffer_size (this=0x7fff9011ea40, n_threads=1) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:150
#1  0x000000000503d5f6 in ddl::Builder::init (this=0x7fff64ceebb0, cursor=..., n_threads=1) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:646
#2  0x0000000004f3512d in ddl::Parallel_cursor::scan (this=0x7fff64ceec80, builders=std::vector of length 1, capacity 1 = {...}) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:135
#3  0x0000000004f289ee in ddl::Loader::scan_and_build_indexes (this=0x7fff9011e9a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:448
#4  0x0000000004f28b15 in ddl::Loader::build_all (this=0x7fff9011e9a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:480
#5  0x0000000004f1130c in ddl::Context::build (this=0x7fff9011ea40) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:510
#6  0x0000000004a26f19 in ha_innobase::inplace_alter_table_impl<dd::Table> (this=0x7fff64d4f010, altered_table=0x7fff64d30050, ha_alter_info=0x7fff9011f6e0)
    at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:6360
#7  0x00000000049ff3fe in ha_innobase::inplace_alter_table (this=0x7fff64d4f010, altered_table=0x7fff64d30050, ha_alter_info=0x7fff9011f6e0, old_dd_tab=0x7fff64d176d0, new_dd_tab=0x7fff64d194d0)
    at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:1557
#8  0x00000000033d5695 in handler::ha_inplace_alter_table (this=0x7fff64d4f010, altered_table=0x7fff64d30050, ha_alter_info=0x7fff9011f6e0, old_table_def=0x7fff64d176d0, 
    new_table_def=0x7fff64d194d0) at /pxc/mysql-8.0.36/sql/handler.h:6180
    
ddl::Context::Context    

#0  ddl::Context::Context (this=0x7fff007e9a40, trx=0x7fffe5676798, old_table=0x7fffdb310f48, new_table=0x7fffdb310f48, online=true, indexes=0x7fffdb324018, key_numbers=0x7fffdb324040, 
    n_indexes=1, table=0x7fffdb3200f0, add_cols=0x0, col_map=0x0, add_autoinc=18446744073709551615, sequence=..., skip_pk_sort=false, stage=0x7fffdbc70b50, add_v=0x0, eval_table=0x7fffdb3200f0, 
    max_buffer_size=1048576, max_threads=4) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:67
#1  0x0000000004a26f06 in ha_innobase::inplace_alter_table_impl<dd::Table> (this=0x7fffdb311950, altered_table=0x7fffdb3200f0, ha_alter_info=0x7fff007ea6e0)
    at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:6353
#2  0x00000000049ff3fe in ha_innobase::inplace_alter_table (this=0x7fffdb311950, altered_table=0x7fffdb3200f0, ha_alter_info=0x7fff007ea6e0, old_dd_tab=0x7fffdb2f7390, new_dd_tab=0x7fffdbc60a40)
    at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:1557
#3  0x00000000033d5695 in handler::ha_inplace_alter_table (this=0x7fffdb311950, altered_table=0x7fffdb3200f0, ha_alter_info=0x7fff007ea6e0, old_table_def=0x7fffdb2f7390, 
    new_table_def=0x7fffdbc60a40) at /pxc/mysql-8.0.36/sql/handler.h:6180
#4  0x00000000033b7f2d in mysql_inplace_alter_table (thd=0x7fffdb2224d0, schema=..., new_schema=..., table_def=0x7fffdb2f7390, altered_table_def=0x7fffdbc60a40, table_list=0x7fffdb33e1f8, 
    table=0x7fffdb305ca0, altered_table=0x7fffdb3200f0, ha_alter_info=0x7fff007ea6e0, inplace_supported=HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE, alter_ctx=0x7fff007eb600, 
    columns=std::set with 0 elements, fk_key_info=0x7fffdb32af78, fk_key_count=0, fk_invalidator=0x7fff007eb530) at /pxc/mysql-8.0.36/sql/sql_table.cc:13533
#5  0x00000000033c40a8 in mysql_alter_table (thd=0x7fffdb2224d0, new_db=0x7fffdb33e850 "optest", new_name=0x0, create_info=0x7fff007ed150, table_list=0x7fffdb33e1f8, alter_info=0x7fff007ecfe0)
    at /pxc/mysql-8.0.36/sql/sql_table.cc:17501
#6  0x00000000039eb610 in Sql_cmd_alter_table::execute (this=0x7fffdb33e9d8, thd=0x7fffdb2224d0) at /pxc/mysql-8.0.36/sql/sql_alter.cc:349
#7  0x00000000032ddb84 in mysql_execute_command (thd=0x7fffdb2224d0, first_level=true) at /pxc/mysql-8.0.36/sql/sql_parse.cc:4721
#8  0x00000000032dfe21 in dispatch_sql_command (thd=0x7fffdb2224d0, parser_state=0x7fff007ee910) at /pxc/mysql-8.0.36/sql/sql_parse.cc:5370
#9  0x00000000032d5b61 in dispatch_command (thd=0x7fffdb2224d0, com_data=0x7fff007efa00, command=COM_QUERY) at /pxc/mysql-8.0.36/sql/sql_parse.cc:2054
#10 0x00000000032d3aad in do_command (thd=0x7fffdb2224d0) at /pxc/mysql-8.0.36/sql/sql_parse.cc:1439
#11 0x00000000034f39d5 in handle_connection (arg=0xafe3800) at /pxc/mysql-8.0.36/sql/conn_handler/connection_handler_per_thread.cc:302
#12 0x000000000518ad94 in pfs_spawn_thread (arg=0xafdd780) at /pxc/mysql-8.0.36/storage/perfschema/pfs.cc:3042
#13 0x00007ffff7bc6ea5 in start_thread () from /lib64/libpthread.so.0
#14 0x00007ffff6370b0d in clone () from /lib64/libc.so.6

  @param[in] trx                Transaction.
  @param[in] old_table          Table where rows are read from
  @param[in] new_table          Table where indexes are created; identical to
                                old_table unless creating a PRIMARY KEY
  @param[in] online             True if creating indexes online
  @param[in] indexes            Indexes to be created
  @param[in] key_numbers        MySQL key numbers
  @param[in] n_indexes          Size of indexes[]
  @param[in,out] table          MySQL table, for reporting erroneous key
                                value if applicable
  @param[in] add_cols           Default values of added columns, or NULL
  @param[in] col_map            Mapping of old column numbers to new
                                ones, or nullptr if old_table == new_table
  @param[in] add_autoinc        Number of added AUTO_INCREMENT columns, or
                                ULINT_UNDEFINED if none is added
  @param[in,out] sequence       Autoinc sequence
  @param[in] skip_pk_sort       Whether the new PRIMARY KEY will follow
                                existing order
  @param[in,out] stage          Performance schema accounting object,
                                used by ALTER TABLE.
                                stage->begin_phase_read_pk() will be called
                                at the beginning of this function and it will
                                be passed to other functions for further
                                accounting.
  @param[in] add_v              New virtual columns added along with indexes
  @param[in] eval_table         MySQL table used to evaluate virtual column
                                value, see innobase_get_computed_value().
  @param[in] max_buffer_size    Memory use upper limit.
  @param[in] max_threads        true if DDL should use multiple threads. */



class Loader {
 public:
  /** Builder task. */
  struct Task {
    /** Constructor. */
    Task() = default;

    /** Constructor.
    @param[in,out] builder        Builder that performs the operation. */
    explicit Task(Builder *builder) : m_builder(builder) {}

    /** Constructor.
    @param[in,out] builder        Builder that performs the operation.
    @param[in] thread_id          Index value of the thread_state to work on. */
    explicit Task(Builder *builder, size_t thread_id)
        : m_builder(builder), m_thread_id(thread_id) {}

    /** Do the operation.
    @return DB_SUCCESS or error code. */
    [[nodiscard]] dberr_t operator()() noexcept;

   private:
    /** Builder instance. */
    Builder *m_builder{};

    /** Thread state index. */
    size_t m_thread_id{std::numeric_limits<size_t>::max()};

    friend class Loader;
  };

  // Forward declaration
  class Task_queue;

  /** Constructor.
  @param[in,out] ctx            DDL context. */
  explicit Loader(ddl::Context &ctx) noexcept;

  /** Destructor. */
  ~Loader() noexcept;

  /** Build the read instance.
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t build_all() noexcept;

  /** Add a task to the task queue.
  @param[in] task               Task to add. */
  void add_task(Task task) noexcept;

  /** Validate the indexes (except FTS).
  @return true on success. */
  [[nodiscard]] bool validate_indexes() const noexcept;

 private:
  /** Prepare to build and load the indexes.
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t prepare() noexcept;

  /** Load the indexes.
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t load() noexcept;

  /** Scan and build the indexes.
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t scan_and_build_indexes() noexcept;

 private:
  /** DDL context, shared by the loader threads. */
  ddl::Context &m_ctx;

  /** If true then use parallel scan and index build. */
  bool m_parallel{};

  /** Sort buffer size. */
  size_t m_sort_buffer_size{};

  /** IO buffer size. */
  size_t m_io_buffer_size{};

  /** Index builders. */
  Builders m_builders{};

  /** Task queue. */
  Task_queue *m_taskq{};
};




ha_innobase::inplace_alter_table_impl
   ->ddl::Context::Context
      初始化ddl::Context结构,这里可以明显的看到并行线程数量和并行的buffer大小
      ->m_need_observer = m_old_table != m_new_table;//是否改变了表的 ?
      ->for (size_t i = 0; i < n_indexes; ++i) { //需要重建的索引个数,循环写入到m_indexes vector中
         ->m_indexes.push_back(indexes[i]); //放入到m_indexes数组中
         ->if (i == 0) { //如果位0,计算唯一性需要多少字段
            -> m_n_uniq = dict_index_get_n_unique(m_indexes.back());
               计算唯一性需要多少字段
            ->if (!dict_index_is_spatial(m_indexes.back())) { //如果不是空间索引
               ->m_need_observer = true; //那就是普通索引就需要观察者
            -> m_key_numbers.push_back(key_numbers[i]); //index 编号 8?
            ->if (m_need_observer) { //如果需要建立观察者,对刷脏做准备,在应用redo log之前需要先刷脏
                  auto observer = ut::new_withkey<Flush_observer>(
                  ut::make_psi_memory_key(mem_key_ddl), space_id, m_trx, m_stage);  //m_stage 为pformance下的sate 这里包含了space_id
                  trx_set_flush_observer(m_trx, observer); //关联观察者
            ->mutex_create(LATCH_ID_DDL_AUTOINC, &m_autoinc_mutex); //这个锁作用?
            ->if (m_add_cols != nullptr) { //如果还需要新增字段
               ...
   ->ddl::Context::build
     -> loader{*this}; //Loader::Loader(Context &ctx) noexcept : m_ctx(ctx) {} 
         这里只是简单的构造 将构建好的 ddl::Context 放入
     ->ddl::Loader::build_all  进行数据读取?   ddl0loader.cc:480
        ->ddl::Loader::prepare  ddl0loader.cc:366              
           for (size_t i = 0; i < m_ctx.m_indexes.size(); ++i) 
           循环需要新建的每个索引,
           -> ddl::Context::setup_fts_build
               全文索引,一般没有返回 DB_SUCCESS
           -> ddl::Builder::Builder
               主要将Loader和Context当入builder进行初始化
           ->m_builders.push_back(builder)
              将builder放入到Loader的m_builders数组中,这里可以看出
              每个需要新建的索引有一个builder
        ->ddl::Loader::scan_and_build_indexes    ddl0loader.cc:480
           ->auto cursor = Cursor::create_cursor(m_ctx)
              ddl::Cursor::create_cursor 根据上线文建立游标,构造函数
              ->Cursor(ctx), 将上下文放入到cursor中
                 ddl::Parallel_cursor::Parallel_cursor
              ->m_index(const_cast<dict_index_t *>(m_ctx.index())), 根据上线文的老表的索引结构建立迭代器 return m_old_table->first_index()
              ->m_single_threaded_mode(m_ctx.has_virtual_columns() || m_ctx.has_fts_indexes()) 是否使用单线程模式 这里可以看出如果有虚拟列和全文索引不能用并行DDL 这里循环的是老表
           ->auto err = m_ctx.read_init(cursor)
              ddl::Context::read_init
              ->m_cursor = cursor
              ->setup_nonnull();
                 ddl::Context::setup_nonnull
                 如果表结构没有更改则直接返回
              ->setup_pk_sort  
                ddl::Context::setup_pk_sort   
                ->if (m_skip_pk_sort) 
                   是否需主键排序?
                ->否则
                   返回DB_SUCCESS
           ->cursor->open()
             ddl::Parallel_cursor::open
             ? 空函数
           ->innobase_rec_reset
             Resets table->record[0]              
           ->cursor->scan(m_builders)
             ddl::Parallel_cursor::scan(ddl0par-scan.cc:135)
             -> auto use_n_threads = thd_parallel_read_threads(m_ctx.m_trx->mysql_thd);
                   并行度 eg 默认为4  这个是并行innodb_parallel_read_threads
             ->if (use_n_threads > 1)
                  如果设置的读取并行度大于1则需要根据实际的需求分析是否需要并行读取
                  for (auto &builder : builders)
                  根据每个索引进行判断
                 ->if (builder->is_skip_file_sort() || builder->is_spatial_index())
                     是否扫描可以非并行,(一般只要不动主键都可以避免扫描后重新排序?何时skip file sort 还需要研究 TODO 包含主键重建的不能并行)
                     但是现在看起来大部分都需要。并行扫描。
                      m_single_threaded_mode = true;
                 -> if (!m_single_threaded_mode) 注意是使用并行的情况下
                     n_threads = Parallel_reader::available_threads(use_n_threads, false)
                     检查到底能够实际分配多少threads 默认整个实例最大256个
                -> const auto use_n_threads = n_threads == 0 ? 1 : n_threads; 
                  这里由于前面初始化n_threads如果不需要并行读取,n_threads就是0,因此use_n_threads也是1,
                  并且施加const 常量选项。              
             ->Builders batch_insert{}; 
                 //为空间索引准备
             ->for (auto &builder : builders)
               循环每一个索引,一个索引一个builders进行初始化
               ->ddl::Builder::init (ddl0builder.cc:630)
                 ->auto buffer_size = m_ctx.scan_buffer_size(n_threads)
                   innodb_ddl_buffer_size 大小生效点
                   返回一个2个元素的数组 
                   第一个为 innodb_ddl_buffer_size/线程个数*需要重建的索引个数就是 
                   第二个为 iobuffer和一行数据有关,因为一个page必须存下2行数据同时加上对其内存大概10K 但是最后debug为130K 和最后计算有关
                 ->m_sort_index == m_index
                   新建的索引
                 ->for (size_t i = 0; i < n_threads; ++i)
                   err = create_thread_ctx(i, m_sort_index); 
                   m_sort_index为新索引  i为线程ID  每个并发读取线程建立一个    
                   create_thread_ctx为lambuda函数
                   ->key_buffer = ut::new_withkey<Key_sort_buffer>(                                      
                                          ut::make_psi_memory_key(mem_key_ddl), index, buffer_size.first)
                     初始化key_sort_buffer这就是扫描的内存      
                   ->auto thread_ctx = ut::new_withkey<Thread_ctx>(    // 构造函数 ddl::Builder::Thread_ctx::Thread_ctx              
                      ut::make_psi_memory_key(mem_key_ddl), id, key_buffer); 
                     初始化 Thread_ctx调用为                               
                     ddl::Builder::Thread_ctx::Thread_ctx  
                     初始化Thread_ctx  线程上下文 使用key_buffer和id
                     也就是使用到了m_key_buffer
                   ->m_thread_ctxs.push_back(thread_ctx)
                      放入m_thread_ctxs 这是一个std::vector<Thread_ctx *, Allocator> 容器
                   ->thread_ctx->m_aligned_buffer =  ut::make_unique_aligned<byte[]>(ut::make_psi_memory_key(mem_key_ddl),
                                       UNIV_SECTOR_SIZE, buffer_size.second);
                      对每个thread_ctx分配对齐后的innodb_ddl_buffer_size
                   ->thread_ctx->m_io_buffer = {thread_ctx->m_aligned_buffer.get(),
                              buffer_size.second};
                       对每个thread_ctx分配对齐后的io buffer
                   ...
                   ->if (builder->is_spatial_index())
                      如果是空间索引
                     ->batch_insert.push_back(builder)
                        放入到batch_insert中
             ->using Rows = std::vector<Row, ut::allocator<Row>>    
             ->using Row_counters = std::vector<size_t, ut::allocator<size_t>>
               定义2个新的类型
             ->Rows rows{}; 
             ->Row_counters n_rows{};
               定义2个vector容器
             ->rows.resize(use_n_threads);//每个读取线程一个Row结构
             ->n_rows.resize(rows.size());//每个读取线程一个size_t计数器
             ->for (size_t i = 0; i < use_n_threads; ++i) 
             ->for (size_t i = 0; i < use_n_threads; ++i) { 
               循环每个读取线程
               ->m_heaps.push_back(mem_heap_create(1024, UT_LOCATION_HERE))
                  分配堆内存
               ->rows[i].m_add_cols = m_ctx.create_add_cols()
                  需要增加的字段
             ->Parallel_reader reader{n_threads};
                 这里如果没有并发n_threads传入为0,否则传入并发读取的线程个数
               ->Parallel_reader::Parallel_reader(row0pread.cc:188)              
                   这里仅仅是初始化
                   m_max_threads(max_threads)
                   m_n_threads(max_threads)
                   m_ctxs() 这里的上下文只是创建
                   m_sync(max_threads == 0) 是否为同步读取
                   m_n_completed = 0 完成的个数
                   建立mutex结构
             ->const Parallel_reader::Scan_range FULL_SCAN; 
                  扫描范围
             ->定义4个回调函数
             ->batch_inserter
             ->bulk_inserter
             ->reader.set_start_callback
             ->reader.set_finish_callback
             ->Parallel_reader::add_scan
                ->scan_ctx = std::shared_ptr<Scan_ctx>(ut::new_withkey<Scan_ctx>(UT_NEW_THIS_FILE_PSI_KEY, this, m_scan_ctx_id, trx, config, std::move(f)),[](Scan_ctx *scan_ctx) { ut::delete_(scan_ctx); });
                   初始化Scan_ctx结构,这里带入的lambuda函数
                ->Parallel_reader::Scan_ctx::partition
                  ->Parallel_reader::Scan_ctx::create_ranges
             ->Parallel_reader::run
                进行并发读取,也可以单线程读取
                 
                    
  
  
Parallel_reader::Scan_ctx::create_ranges 
   用于创建范围,范围放入Ranges容器中
   ->Parallel_reader::Scan_ctx::create_range
                                         
                     
Parallel_reader::run
   ->Parallel_reader::spawn
      ->Parallel_reader::parallel_read
      



Parallel_reader::worker
  ->Parallel_reader::Ctx::traverse
    ->Parallel_reader::Ctx::traverse_recs
    

 
ddl::Loader::load(ddl0loader.cc:289)
  ->
   
   
   ddl::Loader::Task

Copy_ctx 



将一行数据写入到Key_sort_buffer中   
Builder::add_to_key_buffer
  ->Builder::copy_row
    ddl0builder.cc:1124
    -> if (unlikely(key_buffer->full()))
      是否已经满了,会控制是否溢出,也就是buffer不够的情况 DB_OVERFLOW
    ->ey_buffer = m_thread_ctxs[ctx.m_thread_id]->m_key_buffer
       首先判断buffer是否满了
   ->Builder::copy_columns
      分配dfield_t 指向实际的数据
       ->key_buffer = m_thread_ctxs[ctx.m_thread_id]->m_key_buffer; 
          获取key buffer
       ->&fields = key_buffer->m_dtuples[key_buffer->size()];  
          fields代表 m_n_tuples 实际有多少数据
      ->dtuple_get_nth_field(ctx.m_row.m_ptr, col_no)
         数据读取在ctx.m_row中行信息,获取字段
      ->dfield_copy(field, src_field)
         只是将指针指向,不存在数据拷贝 
   ->ddl::Key_sort_buffer::deep_copy
      ddl0buffer.cc:95
      ->dfield_dup(field++, m_heap)
         循环每个字段将实际的数据拷贝到
         dfield_t 的void* data中,也就是拷贝
         数据从原始位置到key buffer中的tuples
         中


ddl::Builder::add_row (ddl0builder.cc:1593)
  ->ddl::Builder::bulk_add_row(ddl0builder.cc:1564)
     ->Copy_ctx ctx{row, m_ctx.m_eval_table, thread_id}; 
        ctx只是一个指向
       循环,一直到所有数据读取完成
       ->ddl::Builder::add_to_key_buffer
          如果buffer写满,或者读取数据完成
          返回
       ->key_buffer_sort(thread_id); 
          因为buffer 满了 或者 已经完成 在 sort buffer中进行排序 
       ->key_buffer->serialize(io_buffer, persistor); 
          持久化到临时文件
          mysqld    1479        mysql   48u      REG               8,16      24576    3694322 /pxc/mysql8036/tmp/#3694322 (deleted)
                                                                                                          49152    3694322 /pxc/mysql8036/tmp/#3694322 (deleted
          


Parallel_reader::Ctx::traverse
    ->PCursor pcursor(from->m_pcur, &mtr, m_scan_ctx->m_config.m_read_level);
    ->m_thread_ctx->m_pcursor = &pcursor;
         初始化游标,并且放入Parallel_reader::Ctx.m_thread_ctx.m_pcursor
         中
   -> Parallel_reader::Ctx::traverse_recs(PCursor *pcursor, mtr_t *mtr)
      循环读取和处理每一行处理银行
      -> err = m_scan_ctx->m_f(this); 
          处理数据,调用ddl::Builder::add_row 
      ->page_cur_move_to_next
         移动到吓一跳记录
         



          


  主要用于          拷贝到buffer缓存 
  指向当前           二级索引
  数据 
Copy_ctx --> Builder::Thread_ctx->m_key_buffer -->
   |                                         |
   | 指向                                |
   |/                                        |/
cursor? row               临时文件





Loader::load 扫描完成后进行处理其他索引的重建,但是不包含主键,主键是直接插入的



Parallel_reader::Thread_ctx::savepoint
   ->m_pcursor->savepoint
      PCursor::savepoint
        -> m_pcur->move_to_prev_on_page()
           btr_pcur_t::move_to_prev_on_page
        ->m_pcur->store_position(m_mtr)
            btr_pcur_t::store_position
        ->m_mtr->commit()
           MTR提交记录redo和释放page锁


Parallel_reader::Thread_ctx::restore_from_savepoint
   ->m_pcursor->restore_from_savepoint
      PCursor::restore_from_savepoint
      ->  PCursor::resume()
         -> m_mtr->start();
            重新加锁
         -> restore_position()
            恢复游标
          -> 可能需要移动到下一个page 
             m_pcur->move_to_next_on_page()
      -> btr_pcur_t::is_on_user_rec  (DB_SUCCESS : move_to_user_rec()) 
          是否是用户记录,不是移动到用户记录



其中ddl::Loader::Task::operator 是其主要操作符重载函数(ddl0builder.cc:2138)
  ->ddl::Builder::merge_sort
     对临时文件进行归并排序,生成新的临时文件
     connectio 1479 1552   mysql   48u      REG               8,16   10670080    3694322 /pxc/mysql8036/tmp/#3694322 (deleted)
     connectio 1479 1552   mysql   49u      REG               8,16   10670080    3694366 /pxc/mysql8036/tmp/#3694366 (deleted)
     归并期间会有2个临时文件
     归并完成后剩下
     connectio 1479 1552   mysql   49u      REG               8,16   10670080    3694366 /pxc/mysql8036/tmp/#3694366 (deleted)
  ->ddl::Builder::btree_build  ddl0builder.cc:1755
     根据归并文件重建索引


(gdb) info b
Num     Type           Disp Enb Address            What
1       breakpoint     keep y   0x00000000030fabc5 in main(int, char**) at /pxc/mysql-8.0.36/sql/main.cc:25
        breakpoint already hit 1 time
2       breakpoint     keep y   0x0000000004f3473e in operator()() const at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:238
        breakpoint already hit 18 times


ddl::Cursor

        
innodb_parallel_read_threads=1
innodb_ddl_threads =1
innodb_purge_threads = 1

     

首先调整并行度

CREATE TABLE t1 (pk CHAR(5) PRIMARY KEY);
INSERT INTO t1 VALUES ('aaaaa'), ('bbbbb'), ('bbbcc'), ('ccccc'), ('ddddd'), ('eeeee');

1、
2行记录key buffer就满
SET DEBUG='+d,ddl_buf_add_two';


2、进行delete操作,关闭掉purge线程

set global innodb_purge_stop_now=ON;
DELETE FROM t1 WHERE pk = 'bbbcc';


3、进行DDL,并且来到ddl0par-scan.cc:238 行

ALTER TABLE t1 ENGINE=InnoDB, ALGORITHM=INPLACE


4、开启purge操作

SET GLOBAL innodb_purge_run_now=ON;


5、DDL继续进程





首先调整并行度

CREATE TABLE t1 (pk CHAR(5) PRIMARY KEY);
INSERT INTO t1 VALUES ('aaaaa'), ('bbbbb'), ('bbbcc'), ('ccccc'), ('ddddd'), ('eeeee');

1、
2行记录key buffer就满
SET DEBUG='+d,ddl_buf_add_two';


2、进行delete操作,关闭掉purge线程

set global innodb_purge_stop_now=ON;
DELETE FROM t1 WHERE pk = 'bbbcc';


3、进行DDL,并且来到ddl0par-scan.cc:238 行

ALTER TABLE t1 ENGINE=InnoDB, ALGORITHM=INPLACE


4、开启purge操作

SET GLOBAL innodb_purge_run_now=ON;


5、DDL继续进程,丢数据




Breakpoint 6, trx_purge_attach_undo_recs (n_purge_threads=1, batch_size=300) at /pxc/mysql-8.0.36/storage/innobase/trx/trx0purge.cc:2303
2303        purge_groups.add(rec);
(gdb) info b
Num     Type           Disp Enb Address            What
1       breakpoint     keep y   0x00000000030fabc5 in main(int, char**) at /pxc/mysql-8.0.36/sql/main.cc:25
        breakpoint already hit 1 time
2       breakpoint     keep y   0x0000000004c1dc6c in row_purge_record_func(purge_node_t*, trx_undo_rec_t*, que_thr_t const*, bool, THD*) at /pxc/mysql-8.0.36/storage/innobase/row/row0purge.cc:1079
        breakpoint already hit 29 times
5       breakpoint     keep y   0x0000000004c1e373 in row_purge_step(que_thr_t*) at /pxc/mysql-8.0.36/storage/innobase/row/row0purge.cc:1238
        breakpoint already hit 28 times
6       breakpoint     keep y   0x0000000004cc71f1 in trx_purge_attach_undo_recs(ulint, ulint) at /pxc/mysql-8.0.36/storage/innobase/trx/trx0purge.cc:2303
        breakpoint already hit 28 times
7       breakpoint     keep y   0x0000000004c1bb80 in row_purge_remove_clust_if_poss(purge_node_t*) at /pxc/mysql-8.0.36/storage/innobase/row/row0purge.cc:243
8       breakpoint     keep y   0x0000000004a5da4d in btr_cur_optimistic_delete(btr_cur_t*, unsigned long, mtr_t*) at /pxc/mysql-8.0.36/storage/innobase/include/btr0cur.h:466
        breakpoint already hit 1 time



首先调整并行度

CREATE TABLE t1 (pk CHAR(5) PRIMARY KEY);
INSERT INTO t1 VALUES ('aaaaa'), ('bbbbb'), ('ccccc'), ('ddddd'), ('eeeee');

1、
2行记录key buffer就满
SET DEBUG='+d,ddl_buf_add_two';

2、S2 进行DDL,并且来到ddl0par-scan.cc:238 行

ALTER TABLE t1 ENGINE=InnoDB, ALGORITHM=INPLACE


3、S1 进行insert 操作

begin;
INSERT INTO t1 VALUES ('bbbcc');



4、DDL继续 重复行报错 







#0  ddl::Builder::add_to_key_buffer (this=0xbcc5cb0, ctx=..., mv_rows_added=@0x7fffa06f0458: 0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1356
#1  0x0000000005040436 in ddl::Builder::bulk_add_row(ddl::Cursor&, ddl::Row&, unsigned long, std::function<dberr_t ()>&&) (this=0xbcc5cb0, cursor=..., row=..., thread_id=0, latch_release=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x11a2629e, DIE 0x11abe361>) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1440
#2  0x0000000005040c76 in ddl::Builder::add_row(ddl::Cursor&, ddl::Row&, unsigned long, std::function<dberr_t ()>&&) (this=0xbcc5cb0, cursor=..., row=..., thread_id=0, latch_release=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x11a2629e, DIE 0x11abe2dc>) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1593
#3  0x0000000004f34819 in operator() (__closure=0x7fffa06f1260, thread_ctx=0xbb216d0, row=...) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:234
#4  0x0000000004f34e2d in operator() (__closure=0xbcb48f0, read_ctx=0xbaf2760) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:364
#5  0x0000000004f3722c in std::__invoke_impl<dberr_t, ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)>&, const Parallel_reader::Ctx*>(std::__invoke_other, struct {...} &) (__f=...) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:61
#6  0x0000000004f36b22 in std::__invoke_r<dberr_t, ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)>&, const Parallel_reader::Ctx*>(struct {...} &) (__fn=...) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:114
#7  0x0000000004f363fc in std::_Function_handler<dberr_t(const Parallel_reader::Ctx*), ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)> >::_M_invoke(const std::_Any_data &, <unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10badb3e, DIE 0x10c41e8b>) (__functor=..., __args#0=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10badb3e, DIE 0x10c41e8b>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_function.h:290
#8  0x0000000004c116e7 in std::function<dberr_t (Parallel_reader::Ctx const*)>::operator()(Parallel_reader::Ctx const*) const (this=0xbcb48a8, __args#0=0xbaf2760) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_function.h:590
#9  0x0000000004c0bcea in Parallel_reader::Ctx::traverse_recs (this=0xbaf2760, pcursor=0x7fffa06f0af0, mtr=0x7fffa06f0b10) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:752
#10 0x0000000004c0b7c4 in Parallel_reader::Ctx::traverse (this=0xbaf2760) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:633
#11 0x0000000004c0c1cc in Parallel_reader::worker (this=0x7fffa06f1290, thread_ctx=0xbb216d0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:868
#12 0x0000000004c0dabb in Parallel_reader::parallel_read (this=0x7fffa06f1290) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1293
#13 0x0000000004c0de2e in Parallel_reader::spawn (this=0x7fffa06f1290, n_threads=0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1347
#14 0x0000000004c0dec1 in Parallel_reader::run (this=0x7fffa06f1290, n_threads=0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1362
#15 0x0000000004f35644 in ddl::Parallel_cursor::scan (this=0xbcc5db0, builders=std::vector of length 1, capacity 1 = {...}) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:372
#16 0x0000000004f289ee in ddl::Loader::scan_and_build_indexes (this=0x7fffa06f19a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:448
#17 0x0000000004f28b15 in ddl::Loader::build_all (this=0x7fffa06f19a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:480
#18 0x0000000004f1130c in ddl::Context::build (this=0x7fffa06f1a40) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:510
#19 0x0000000004a26f19 in ha_innobase::inplace_alter_table_impl<dd::Table> (this=0xbcc9ef0, altered_table=0xbcb1090, ha_alter_info=0x7fffa06f26e0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:6360
#20 0x00000000049ff3fe in ha_innobase::inplace_alter_table (this=0xbcc9ef0, altered_table=0xbcb1090, ha_alter_info=0x7fffa06f26e0, old_dd_tab=0xbaf3740, new_dd_tab=0xb1b66b0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:1557
#21 0x00000000033d5695 in handler::ha_inplace_alter_table (this=0xbcc9ef0, altered_table=0xbcb1090, ha_alter_info=0x7fffa06f26e0, old_table_def=0xbaf3740, new_table_def=0xb1b66b0) at /pxc/mysql-8.0.36/sql/handler.h:6180





#0  ddl::Loader::Task::operator() (this=0x7fff8f7ec840) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:2138
#1  0x0000000004f2a43e in ddl::Loader::Task_queue::mt_execute (this=0x7fffb4502f90) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:177
#2  0x0000000004f29ed2 in ddl::Loader::Task_queue::execute (this=0x7fffb4502f90) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:76
#3  0x0000000004f2e363 in std::__invoke_impl<dberr_t, dberr_t (ddl::Loader::Task_queue::*&)(), ddl::Loader::Task_queue*&> (__f=@0x7fff8f7ec990: (dberr_t (ddl::Loader::Task_queue::*)(ddl::Loader::Task_queue * const)) 0x4f29e6e <ddl::Loader::Task_queue::execute()>, __t=@0x7fff8f7ec9a0: 0x7fffb4502f90) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:74
#4  0x0000000004f2dba9 in std::__invoke<dberr_t (ddl::Loader::Task_queue::*&)(), ddl::Loader::Task_queue*&> (__fn=@0x7fff8f7ec990: (dberr_t (ddl::Loader::Task_queue::*)(ddl::Loader::Task_queue * const)) 0x4f29e6e <ddl::Loader::Task_queue::execute()>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:96
#5  0x0000000004f2ce1c in std::_Bind<dberr_t (ddl::Loader::Task_queue::*(ddl::Loader::Task_queue*))()>::__call<dberr_t, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (this=0x7fff8f7ec990, __args=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10a8a6d5, DIE 0x10b1f644>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/functional:420
#6  0x0000000004f2c130 in std::_Bind<dberr_t (ddl::Loader::Task_queue::*(ddl::Loader::Task_queue*))()>::operator()<, dberr_t>() (this=0x7fff8f7ec990) at /opt/rh/devtoolset-11/root/usr/include/c++/11/functional:503
#7  0x0000000004f2ae45 in Runnable::operator()<dberr_t (ddl::Loader::Task_queue::*)(), ddl::Loader::Task_queue*&>(dberr_t (ddl::Loader::Task_queue::*&&)(), ddl::Loader::Task_queue*&) (this=0x7fff8f7ec9e8, f=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10a8a6d5, DIE 0x10b24119>) at /pxc/mysql-8.0.36/storage/innobase/include/os0thread-create.h:155
#8  0x0000000004f276e7 in operator() (__closure=0x7fffb45265e0, seqnum=1) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:321
#9  0x0000000004f2978d in std::__invoke_impl<dberr_t, ddl::Loader::load()::<lambda(PSI_thread_seqnum)>, long unsigned int>(std::__invoke_other, <unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10a8a6d5, DIE 0x10b1a212>) (__f=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10a8a6d5, DIE 0x10b1a212>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:61
#10 0x0000000004f29726 in std::__invoke<ddl::Loader::load()::<lambda(PSI_thread_seqnum)>, long unsigned int>(<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10a8a6d5, DIE 0x10b1a330>) (__fn=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10a8a6d5, DIE 0x10b1a330>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:96
#11 0x0000000004f29697 in std::thread::_Invoker<std::tuple<ddl::Loader::load()::<lambda(PSI_thread_seqnum)>, long unsigned int> >::_M_invoke<0, 1>(std::_Index_tuple<0, 1>) (this=0x7fffb45265d8) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_thread.h:253
#12 0x0000000004f29652 in std::thread::_Invoker<std::tuple<ddl::Loader::load()::<lambda(PSI_thread_seqnum)>, long unsigned int> >::operator()(void) (this=0x7fffb45265d8) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_thread.h:260
#13 0x0000000004f29636 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<ddl::Loader::load()::<lambda(PSI_thread_seqnum)>, long unsigned int> > >::_M_run(void) (this=0x7fffb45265d0) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_thread.h:211
#14 0x00007ffff6c12fdf in execute_native_thread_routine () at ../../../../../libstdc++-v3/src/c++11/thread.cc:80
#15 0x00007ffff7bc6ea5 in start_thread () from /lib64/libpthread.so.0
#16 0x00007ffff6370b0d in clone () from /lib64/libc.so.6




#0  ddl::Loader::Task::Task (this=0x7fffa05ef3b0, builder=0x7fffb4580bb0, thread_id=0) at /pxc/mysql-8.0.36/storage/innobase/include/ddl0impl-loader.h:58
#1  0x0000000005042043 in ddl::Builder::create_merge_sort_tasks (this=0x7fffb4580bb0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1901
#2  0x0000000005042972 in ddl::Builder::setup_sort (this=0x7fffb4580bb0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:2030
#3  0x0000000005042eb1 in ddl::Loader::Task::operator() (this=0x7fffa05ef4c0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:2141
#4  0x0000000004f2a5aa in ddl::Loader::Task_queue::st_execute (this=0x7fffb42e59d0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:205
#5  0x0000000004f29eef in ddl::Loader::Task_queue::execute (this=0x7fffb42e59d0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:79
#6  0x0000000004f279cc in ddl::Loader::load (this=0x7fffa05ef9a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:341
#7  0x0000000004f28a78 in ddl::Loader::scan_and_build_indexes (this=0x7fffa05ef9a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:460
#8  0x0000000004f28b15 in ddl::Loader::build_all (this=0x7fffa05ef9a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:480
#9  0x0000000004f1130c in ddl::Context::build (this=0x7fffa05efa40) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:510
#10 0x0000000004a26f19 in ha_innobase::inplace_alter_table_impl<dd::Table> (this=0x7fffb458b7f0, altered_table=0x7fffb4557cf0, ha_alter_info=0x7fffa05f06e0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:6360




加入到key buffer中

#0  ddl::Builder::add_row(ddl::Cursor&, ddl::Row&, unsigned long, std::function<dberr_t ()>&&) (this=0x7fffb45314f0, cursor=..., row=..., thread_id=0, latch_release=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x11a2629e, DIE 0x11abe2dc>) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1579
#1  0x0000000004f34819 in operator() (__closure=0x7fffa05ef260, thread_ctx=0x7fffb4500630, row=...) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:234
#2  0x0000000004f34e2d in operator() (__closure=0x7fffb4311970, read_ctx=0x7fffb4504670) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:364
#3  0x0000000004f3722c in std::__invoke_impl<dberr_t, ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)>&, const Parallel_reader::Ctx*>(std::__invoke_other, struct {...} &) (__f=...) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:61
#4  0x0000000004f36b22 in std::__invoke_r<dberr_t, ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)>&, const Parallel_reader::Ctx*>(struct {...} &) (__fn=...) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:114
#5  0x0000000004f363fc in std::_Function_handler<dberr_t(const Parallel_reader::Ctx*), ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)> >::_M_invoke(const std::_Any_data &, <unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10badb3e, DIE 0x10c41e8b>) (__functor=..., __args#0=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10badb3e, DIE 0x10c41e8b>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_function.h:290
#6  0x0000000004c116e7 in std::function<dberr_t (Parallel_reader::Ctx const*)>::operator()(Parallel_reader::Ctx const*) const (this=0x7fffb4541db8, __args#0=0x7fffb4504670) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_function.h:590
#7  0x0000000004c0bcea in Parallel_reader::Ctx::traverse_recs (this=0x7fffb4504670, pcursor=0x7fffa05eeaf0, mtr=0x7fffa05eeb10) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:752
#8  0x0000000004c0b7c4 in Parallel_reader::Ctx::traverse (this=0x7fffb4504670) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:633
#9  0x0000000004c0c1cc in Parallel_reader::worker (this=0x7fffa05ef290, thread_ctx=0x7fffb4500630) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:868
#10 0x0000000004c0dabb in Parallel_reader::parallel_read (this=0x7fffa05ef290) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1293
#11 0x0000000004c0de2e in Parallel_reader::spawn (this=0x7fffa05ef290, n_threads=0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1347
#12 0x0000000004c0dec1 in Parallel_reader::run (this=0x7fffa05ef290, n_threads=0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1362


初始化 Parallel_reader::Scan_ctx::Scan_ctx,其中的回调函数为 lambuda函数 ,扫描每行数据都会调用,对每行数据进行处理 

  auto err = reader.add_scan(        //初始化
      /* Ignore read views for non-online scans. */
      m_ctx.m_online ? m_ctx.m_trx : nullptr, config,
      [&](const Parallel_reader::Ctx *read_ctx) { //回调函数每行数据处理 
        const auto thread_id = read_ctx->thread_id();

        auto &row = rows[thread_id];
        auto heap = m_heaps[thread_id];

其中会调用bulk_inserter将数据插入到key buffer,并且也传入

然后调用


#0  Parallel_reader::Scan_ctx::Scan_ctx(Parallel_reader*, unsigned long, trx_t*, Parallel_reader::Config const&, std::function<dberr_t (Parallel_reader::Ctx const*)>&&) (this=0xbf00420, reader=0x7fffa06f1290, id=0, trx=0x7fffe5674ff8, config=..., f=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0xe38c548, DIE 0xe429b83>) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:204
#1  0x0000000004c1259e in ut::new_withkey<Parallel_reader::Scan_ctx, Parallel_reader*, unsigned long&, trx_t*&, Parallel_reader::Config const&, std::function<dberr_t (Parallel_reader::Ctx const*)> >(ut::PSI_memory_key_t, Parallel_reader*&&, unsigned long&, trx_t*&, Parallel_reader::Config const&, std::function<dberr_t (Parallel_reader::Ctx const*)>&&) (key=...) at /pxc/mysql-8.0.36/storage/innobase/include/ut0new.h:754
#2  0x0000000004c0e0db in Parallel_reader::add_scan(trx_t*, Parallel_reader::Config const&, std::function<dberr_t (Parallel_reader::Ctx const*)>&&) (this=0x7fffa06f1290, trx=0x7fffe5674ff8, config=..., f=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0xe38c548, DIE 0xe428341>) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1399
#3  0x0000000004f35612 in ddl::Parallel_cursor::scan (this=0xbd78e50, builders=std::vector of length 1, capacity 1 = {...}) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:326
#4  0x0000000004f289ee in ddl::Loader::scan_and_build_indexes (this=0x7fffa06f19a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:448
#5  0x0000000004f28b15 in ddl::Loader::build_all (this=0x7fffa06f19a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:480
#6  0x0000000004f1130c in ddl::Context::build (this=0x7fffa06f1a40) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:510
#7  0x0000000004a26f19 in ha_innobase::inplace_alter_table_impl<dd::Table> (this=0xbeca3c0, altered_table=0xbedd420, ha_alter_info=0x7fffa06f26e0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:6360
#8  0x00000000049ff3fe in ha_innobase::inplace_alter_table (this=0xbeca3c0, altered_table=0xbedd420, ha_alter_info=0x7fffa06f26e0, old_dd_tab=0xbb32430, new_dd_tab=0xbd649c0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:1557
#9  0x00000000033d5695 in handler::ha_inplace_alter_table (this=0xbeca3c0, altered_table=0xbedd420, ha_alter_info=0x7fffa06f26e0, old_table_def=0xbb32430, new_table_def=0xbd649c0) at /pxc/mysql-8.0.36/sql/handler.h:6180
#10 0x00000000033b7f2d in mysql_inplace_alter_table (thd=0xb03b540, schema=..., new_schema=..., table_def=0xbb32430, altered_table_def=0xbd649c0, table_list=0xb154100, table=0xbed6360, altered_table=0xbedd420, ha_alter_info=0x7fffa06f26e0, inplace_supported=HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE, alter_ctx=0x7fffa06f3600, columns=std::set with 0 elements, fk_key_info=0xbd96100, fk_key_count=0, fk_invalidator=0x7fffa06f3530) at /pxc/mysql-8.0.36/sql/sql_table.cc:13533
#11 0x00000000033c40a8 in mysql_alter_table (thd=0xb03b540, new_db=0xb153648 "testrep", new_name=0x0, create_info=0x7fffa06f5150, table_list=0xb154100, alter_info=0x7fffa06f4fe0) at /pxc/mysql-8.0.36/sql/sql_table.cc:17501
#12 0x00000000039eb610 in Sql_cmd_alter_table::execute (this=0xb154a18, thd=0xb03b540) at /pxc/mysql-8.0.36/sql/sql_alter.cc:349
#13 0x00000000032ddb84 in mysql_execute_command (thd=0xb03b540, first_level=true) at /pxc/mysql-8.0.36/sql/sql_parse.cc:4721
#14 0x00000000032dfe21 in dispatch_sql_command (thd=0xb03b540, parser_state=0x7fffa06f6910) at /pxc/mysql-8.0.36/sql/sql_parse.cc:5370
#15 0x00000000032d5b61 in dispatch_command (thd=0xb03b540, com_data=0x7fffa06f7a00, command=COM_QUERY) at /pxc/mysql-8.0.36/sql/sql_parse.cc:2054
#16 0x00000000032d3aad in do_command (thd=0xb03b540) at /pxc/mysql-8.0.36/sql/sql_parse.cc:1439
#17 0x00000000034f39d5 in handle_connection (arg=0xb039b50) at /pxc/mysql-8.0.36/sql/conn_handler/connection_handler_per_thread.cc:302
#18 0x000000000518ad94 in pfs_spawn_thread (arg=0xafdeab0) at /pxc/mysql-8.0.36/storage/perfschema/pfs.cc:3042
#19 0x00007ffff7bc6ea5 in start_thread () from /lib64/libpthread.so.0
#20 0x00007ffff6370b0d in clone () from /lib64/libc.so.6
(gdb) ^CQuit




     ddl::Context &m_ctx;                                    /** DDL Context. */  ---
     Loader &m_loader;                                      /** Loader that owns the instance. */ ---





#0  Btree_load::insert (this=0x7fffd4343040, page_loader=0x7fffd43653b0, tuple=0x7fffd436bcb0, big_rec=0x0, rec_size=28) at /pxc/mysql-8.0.36/storage/innobase/btr/btr0load.cc:1083
#1  0x0000000004e20ae4 in Btree_load::insert (this=0x7fffd4343040, tuple=0x7fffd436bcb0, level=0) at /pxc/mysql-8.0.36/storage/innobase/btr/btr0load.cc:1180
#2  0x0000000004e21125 in Btree_load::build (this=0x7fffd4343040, cursor=...) at /pxc/mysql-8.0.36/storage/innobase/btr/btr0load.cc:1307
#3  0x000000000503fb2f in ddl::Builder::insert_direct (this=0x7fffd435f030, cursor=..., thread_id=0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1303
#4  0x000000000504075b in ddl::Builder::bulk_add_row(ddl::Cursor&, ddl::Row&, unsigned long, std::function<dberr_t ()>&&) (this=0x7fffd435f030, cursor=..., row=..., thread_id=0, latch_release=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x11a2629e, DIE 0x11abe361>) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1488
#5  0x0000000005040c76 in ddl::Builder::add_row(ddl::Cursor&, ddl::Row&, unsigned long, std::function<dberr_t ()>&&) (this=0x7fffd435f030, cursor=..., row=..., thread_id=0, latch_release=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x11a2629e, DIE 0x11abe2dc>) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0builder.cc:1593
#6  0x0000000004f34819 in operator() (__closure=0x7fffa06f1260, thread_ctx=0x7fffd433edf0, row=...) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:234
#7  0x0000000004f34e2d in operator() (__closure=0x7fffd4333d90, read_ctx=0x7fffd436e740) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:364
#8  0x0000000004f3722c in std::__invoke_impl<dberr_t, ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)>&, const Parallel_reader::Ctx*>(std::__invoke_other, struct {...} &) (__f=...) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:61
#9  0x0000000004f36b22 in std::__invoke_r<dberr_t, ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)>&, const Parallel_reader::Ctx*>(struct {...} &) (__fn=...) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/invoke.h:114
#10 0x0000000004f363fc in std::_Function_handler<dberr_t(const Parallel_reader::Ctx*), ddl::Parallel_cursor::scan(ddl::Builders&)::<lambda(const Parallel_reader::Ctx*)> >::_M_invoke(const std::_Any_data &, <unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10badb3e, DIE 0x10c41e8b>) (__functor=..., __args#0=<unknown type in /pxc/mysql8036/bin/mysqld, CU 0x10badb3e, DIE 0x10c41e8b>) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_function.h:290
#11 0x0000000004c116e7 in std::function<dberr_t (Parallel_reader::Ctx const*)>::operator()(Parallel_reader::Ctx const*) const (this=0x7fffd436b838, __args#0=0x7fffd436e740) at /opt/rh/devtoolset-11/root/usr/include/c++/11/bits/std_function.h:590
#12 0x0000000004c0bcea in Parallel_reader::Ctx::traverse_recs (this=0x7fffd436e740, pcursor=0x7fffa06f0af0, mtr=0x7fffa06f0b10) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:752
#13 0x0000000004c0b7c4 in Parallel_reader::Ctx::traverse (this=0x7fffd436e740) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:633
#14 0x0000000004c0c1cc in Parallel_reader::worker (this=0x7fffa06f1290, thread_ctx=0x7fffd433edf0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:868
#15 0x0000000004c0dabb in Parallel_reader::parallel_read (this=0x7fffa06f1290) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1293
#16 0x0000000004c0de2e in Parallel_reader::spawn (this=0x7fffa06f1290, n_threads=0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1347
#17 0x0000000004c0dec1 in Parallel_reader::run (this=0x7fffa06f1290, n_threads=0) at /pxc/mysql-8.0.36/storage/innobase/row/row0pread.cc:1362
#18 0x0000000004f35644 in ddl::Parallel_cursor::scan (this=0x7fffd435c830, builders=std::vector of length 2, capacity 2 = {...}) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0par-scan.cc:372
#19 0x0000000004f289ee in ddl::Loader::scan_and_build_indexes (this=0x7fffa06f19a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:448
#20 0x0000000004f28b15 in ddl::Loader::build_all (this=0x7fffa06f19a0) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0loader.cc:480
#21 0x0000000004f1130c in ddl::Context::build (this=0x7fffa06f1a40) at /pxc/mysql-8.0.36/storage/innobase/ddl/ddl0ctx.cc:510
#22 0x0000000004a26f19 in ha_innobase::inplace_alter_table_impl<dd::Table> (this=0x7fffd437fc00, altered_table=0x7fffd43812c0, ha_alter_info=0x7fffa06f26e0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:6360





----------------------
1、class Parallel_reader
class Parallel_reader

---Parallel_reader::Ctx
     private:
        size_t m_id{std::numeric_limits<size_t>::max()}; /** Context ID. */
        bool m_split{};                                                  /** If true then split the context at the block level. */
        Scan_ctx::Range m_range{};                            /** Range to read in this context. */
        Scan_ctx *m_scan_ctx{};                                 /** Scanner context. */
     public:
        Thread_ctx *m_thread_ctx{};                             /** Context information related to executing thread ID. */
        const buf_block_t *m_block{};                           /** Current block. */
        const rec_t *m_rec{};                                        /** Current row. */
        size_t m_n_pages{};                                         /** Number of pages traversed by the context. */
        bool m_first_rec{true};                                      /** True if m_rec is the first record in the page. */
        ulint *m_offsets{};
        bool m_start{};                                                 /** Start of a new range to scan. */

---Parallel_reader::Scan_ctx
       private:
         struct Iter {
           mem_heap_t *m_heap{};                                 /** Heap used to allocate m_rec, m_tuple and m_pcur. */
           const ulint *m_offsets{};                                   /** m_rec column offsets. */
           const rec_t *m_rec{};                                      /** Start scanning from this key. Raw data of the row. */
           const dtuple_t *m_tuple{};                                /** Tuple representation inside m_rec, for two Iter instances in a range m_tuple will be [first->m_tuple, second->m_tuple). */
           btr_pcur_t *m_pcur{};                                     /** Persistent cursor.*/
           }
         using Savepoint = std::pair<ulint, buf_block_t *>;
         using Savepoints = std::vector<Savepoint, ut::allocator<Savepoint>>;
         using Range = std::pair<std::shared_ptr<Iter>, std::shared_ptr<Iter>>;
         using Ranges = std::vector<Range, ut::allocator<Range>>;
         using Config = Parallel_reader::Config;
         
         size_t m_id{std::numeric_limits<size_t>::max()}; /** Context ID. */
         Config m_config;                                             /** Parallel scan configuration. */
         const trx_t *m_trx{};                                         /** Covering transaction. */
         F m_f;                                                            /** Callback function. */
         size_t m_depth{};                                            /** Depth of the Btree. */
         Parallel_reader *m_reader{};                           /** The parallel reader. */
         mutable std::atomic<dberr_t> m_err{DB_SUCCESS};  /** Error during parallel read. */
         std::atomic_size_t m_s_locks{};                       /** Number of threads that have S locked the index. */

---struct Thread_ctx
    size_t m_thread_id{std::numeric_limits<size_t>::max()};  /** Thread ID. */
    void *m_callback_ctx{};  /** Callback information related to the thread.@note Needs to be created and destroyed by the callback itself. */
    mem_heap_t *m_blob_heap{};   /** BLOB heap per thread. */
    State m_state{State::UNKNOWN}; /** Worker thread state. */
    PCursor *m_pcursor{};  /** Current persistent cursor. */

---struct Scan_range
    const dtuple_t *m_start{}; /** Start of the scan, can be nullptr for -infinity. */
    const dtuple_t *m_end{};  /** End of the scan, can be null for +infinity. */
    
---struct Config
    const Scan_range m_scan_range; /** Range to scan. */
    dict_index_t *m_index{}; /** (Cluster) Index in table to scan. */
    const bool m_is_compact{}; /** Row format of table. */
    const page_size_t m_page_size; /** Tablespace page size. */
    size_t m_read_level{0}; /** Btree level from which records need to be read. */
    size_t m_partition_id{std::numeric_limits<size_t>::max()}; /** Partition id if the index to be scanned belongs to a partitioned table, else std::numeric_limits<size_t>::max(). */

   
                                                                                                                                                     
         size_t m_max_threads{};           /** Maximum number of worker threads to use. */                                                                                  
         size_t m_n_threads{0};              /** Number of worker threads that will be spawned. */                                                                             
         mutable ib_mutex_t m_mutex;   /** Mutex protecting m_ctxs. */                                                                               
         Ctxs m_ctxs{};                            /** Contexts that must be executed. */                                                                                                                                                   
         Scan_ctxs m_scan_ctxs{};        /** Scan contexts. */                                                                                       
         os_event_t m_event{};               /** For signalling worker threads about events. */                                                      
         uint64_t m_sig_count;               /** Value returned by previous call of os_event_reset() on m_event. */                                  
         size_t m_scan_ctx_id{};           /** Counter for allocating scan context IDs. */                                                           
         std::atomic_size_t m_ctx_id{}; /** Context ID. Monotonically increasing ID. */                                                              
         std::atomic_size_t m_n_completed{};  /** Callback at start (before processing any rows). */                                                 
         Finish m_finish_callback{};                   /** Callback at end (adter processing all rows). */                                           
         std::atomic<dberr_t> m_err{DB_SUCCESS}; /** Error during parallel read. */                                                                  
         std::vector<IB_thread, ut::allocator<IB_thread>> m_parallel_read_threads;  /** List of threads used for paralle_read purpose. */            
         static std::atomic_size_t s_active_threads;   /** Number of threads currently doing parallel reads. */                                      
         bool m_sync; /** If the caller wants to wait for the parallel_read to finish it's run */                                                    
         std::vector<Thread_ctx *, ut::allocator<Thread_ctx *>> m_thread_ctxs;  /** Context information related to each parallel reader thread. */   


2、 struct Builder
using Builders = std::vector<Builder *, ut::allocator<Builder *>>;
using IO_buffer = std::pair<byte *, os_offset_t>;
using Allocator = ut::allocator<Thread_ctx *>;  
using Thread_ctxs = std::vector<Thread_ctx *, Allocator>;

struct Builder
---enum class State
    INIT,                   /** Initial phase. */
    ADD,                  /** Collect the rows for the index to build. */
    SETUP_SORT,    /** Setup the merge sort and add the tasks to the task queue. */
    SORT,                /** Sort the collected rows, if required. The builder moves to state BTREE_BUILD after all sort tasks are completed successfully or there was an error during the sort phase. */
    BTREE_BUILD,  /** Build the btree. */
    FTS_SORT_AND_BUILD, /** FTS sort and build, this is done in one "step" */
    FINISH,             /** Finish the loading of the index. */
    STOP,               /** Stop on success. */
     ERROR            /** Stop on error. */
--Thread_ctx
   size_t m_id{};                                  /** Thread ID. */
   Key_sort_buffer *m_key_buffer{};     /** Key sort buffer. */
   size_t m_n_recs{}                            /** Total number of records added to the key sort buffer. */
   ddl::file_t m_file{}                             /** Merge file handle. */
   ut::unique_ptr_aligned<byte[]> m_aligned_buffer{};  /** Buffer to use for file writes. */
   IO_buffer m_io_buffer;                     /** Buffer to use for file writes. */
   Merge_offsets m_offsets{};               /** Record list starting offset in the output file. */
   RTree_inserter *m_rtree_inserter{};  /** For spatial/Rtree rows handling. */
   
     size_t m_id{};                                              /** Buffer ID. */
     std::atomic<State> m_state{State::INIT};        /** Initial phase. */
     ddl::Context &m_ctx;                                    /** DDL Context. */  ---
     Loader &m_loader;                                      /** Loader that owns the instance. */ ---
     dict_index_t *m_index{};                               /** Index to create (if not FTS index). */
     const char *m_tmpdir{};                               /** Temporary file path. */
     Thread_ctxs m_thread_ctxs{};                       /** Per thread context. */
     dfield_t *m_prev_fields{};                              /** For tracking duplicates. */
     Dup m_clust_dup{};                                      /** For collecting duplicate entries (error reporting). */
     Scoped_heap m_v_heap{};                           /** Scoped virtual column heap. */
     Scoped_heap m_conv_heap{};                      /** Scoped conversion heap. */
     dict_index_t *m_sort_index{};                        /** The index to be built, FTS or non-FTS. */
     std::atomic<size_t> m_n_sort_tasks{};           /** Number of active sort tasks. */
     Btree_load *m_btr_load{};                             /** Cluster index bulk load instance to use, direct insert without a file sort. */ ---
     Alter_stage *m_local_stage{};                        /** Stage per builder. */


3、ddl::Context
(namespace ddl)struct Context

using Scan_buffer_size = std::pair<size_t, size_t>;

(namespace ddl)struct Context
using Scan_buffer_size = std::pair<size_t, size_t>;
using Key_numbers = std::vector<size_t, ut::allocator<size_t>>;
using Indexes = std::vector<dict_index_t *, ut::allocator<dict_index_t *>>;
---struct FTS:不考虑全文索引

    std::atomic<dberr_t> m_err{DB_SUCCESS};                              /** Common error code for all index builders running in parallel. */
    size_t m_err_key_number{std::numeric_limits<size_t>::max()};     /** Index where the error occurred. */
    trx_t *m_trx{};                                                                            /** Transaction covering the index build. */
    FTS m_fts;                                                                                /** The FTS builder. There is one FTS per table. */
    dict_table_t *m_old_table{};                                                        /** Source table, read rows from this table. */
    dict_table_t *m_new_table{};                                                       /** Table where indexes are created; identical to old_table unless creating a PRIMARY KEY. */
    bool m_online{};                                                                        /** True if creating index online. Non-online implies that we have an S latch on the table, therefore there can't be concurrent updates to the table while we are executing the DDL. We don't log the changes to the row log. */
    Indexes m_indexes{};                                                                /** Indexes to be created. */
    Key_numbers m_key_numbers{};                                               /** MySQL key numbers. */
    TABLE *m_table{};                                                                    /** MySQL table for reporting errors/warnings. */
    const dtuple_t *m_add_cols{};                                                     /** Default value for added columns or null. */
    const ulint *m_col_map{};                                                           /** Mapping of old column numbers to new ones, or nullptr if none were added. */
    size_t m_add_autoinc{ULINT_UNDEFINED};                               /** Number of added AUTO_INCREMENT columns, or ULINT_UNDEFINED if none added. */
    ddl::Sequence &m_sequence;                                                    /** Autoinc sequence. */
    Alter_stage *m_stage{};                                                              /** Performance schema accounting object, used by ALTER TABLE.  stage->begin_phase_read_pk() will be called at the beginning of  this function and it will be passed to other functions for further  accounting. */
    const dict_add_v_col_t *m_add_v{};                                            /** New virtual columns added along with indexes */
    TABLE *m_eval_table{};                                                             /** MySQL table used to evaluate virtual column value, see innobase_get_computed_value(). */
    bool m_skip_pk_sort{};                                                              /** Skip the sorting phase if true. */
    std::vector<size_t, ut::allocator<size_t>> m_nonnull{};                  /** Non null columns. */
    size_t m_n_uniq{};                                                                     /** Number of unique columns in the key. */
    bool m_need_observer{};                                                            /** true if need flush observer. */
    Cursor *m_cursor{};                                                                   /** Cursor for reading the cluster index. */  
    size_t m_n_allocated{};                                                               /** Number of bytes used. */
    const size_t m_max_buffer_size{};                                              /** Maximum number of bytes to use. */
    const size_t m_max_threads{};                                                   /** Maximum number of threads to use. We don't do a parallel scan of the clustered index when FTS and/or virtual columns are involved. The build phase is parallel though. */
    ib_mutex_t m_autoinc_mutex;                                                     /** For parallel access to the autoincrement generator. */
    mem_heap_t *m_dtuple_heap{};                                                   /** Heap for copies of m_add_cols. */


4、ddl::Cursor
(namespace ddl) struct Cursor

using Post_row = std::function<dberr_t()>

(namespace ddl) struct Cursor

   ddl::Context &m_ctx;              /** DDL context. */
  Scoped_heap m_row_heap{};  /** Scoped heap to use for rows. */
  Scoped_heap m_tuple_heap{}; /** Scoped heap to use for tuple instances. */
  dfield_t *m_prev_fields{};          /** Previous fields. */


5、ddl::Loader

(namespace ddl) class Loader
class Task_queue;

---struct Task
    Builder *m_builder{};                                                           /** Builder instance. */
    size_t m_thread_id{std::numeric_limits<size_t>::max()};        /** Thread state index. */
 
---Loader::Task_queue
    using Tasks = std::deque<Task, ut::allocator<Task>>;
    const Context &m_ctx;    /** DDL context. */
    bool m_sync{};               /** true if synchronous execution model. */
    Tasks m_tasks{};            /** The task queue. */
    ib_mutex_t m_mutex;      /** Mutex protecting m_tasks access. */
    os_event_t m_consumer_event{};  /** Task queue consumer event. */
    size_t m_n_threads{};                  /** Number of threads (including foreground thread). */
    size_t m_n_idle{};                       /** Number of threads idle. */


         ddl::Context &m_ctx;     /** DDL context, shared by the loader threads. */
         bool m_parallel{};         /** If true then use parallel scan and index build. */
         size_t m_sort_buffer_size{};   /** Sort buffer size. */
         size_t m_io_buffer_size{};       /** IO buffer size. */
         Builders m_builders{};             /** Index builders. */
         Task_queue *m_taskq{};           /** Task queue. */
  
6、key_sort_buffer : private ut::Non_copyable
  
  /** Callback for writing serialized data to to disk.
  using Function = std::function<dberr_t(IO_buffer io_buffer, os_offset_t &n)>;
  using DTuple = dfield_t *;
  using DTuples = std::vector<DTuple, ut::allocator<DTuple>>;

  mem_heap_t *m_heap{};                          /** Memory heap where allocated */
  dict_index_t *m_index{};                           /** The index the tuples belong to */
  size_t m_total_size{};                               /** Total amount of data bytes */
  size_t m_n_tuples{};                                /** Number of data tuples */
  size_t m_max_tuples{}; //最大的tuples个数   /** Maximum number of data tuples */
  DTuples m_dtuples{}; //tuple的结构体内存,能够存储的行数已经计算好了   /** Array of data tuples */
  size_t m_buffer_size{};                            /** Buffer size. */ 也就是参数DDL 内存参数指定的大小


7、 ddl::Parallel_cursor


  using Heaps = std::vector<mem_heap_t *, ut::allocator<mem_heap_t *>>;

  bool m_eof{};                               /** If true then no more rows to scan. */
  Heaps m_heaps{};                       /** Heap per thread. */
  dict_index_t *m_index{};               /** Index to iterate over. */
  bool m_single_threaded_mode{};  /** true if scan should be in single threaded mode. */























                  
                  
                  
struct Row {

  /** Externally stored fields. */
  row_ext_t *m_ext{};

  /** Column offsets. */
  ulint *m_offsets{};

  /** Row data. */
  const rec_t *m_rec{};

  /** DTuple data, mapped over m_rec. */
  const dtuple_t *m_ptr{};

  /** Add column data values. */
  dtuple_t *m_add_cols{};
}

ddl::Cursor{
  /** DDL context. */
  ddl::Context &m_ctx;

  /** Scoped heap to use for rows. */
  Scoped_heap m_row_heap{};

  /** Scoped heap to use for tuple instances. */
  Scoped_heap m_tuple_heap{};

  /** Previous fields. */
  dfield_t *m_prev_fields{};
  }




Key_sort_buffer                 
              
             


loader --->ddl::Context &m_ctx; DDL上下文结构
           ---->Builders m_builders  Builder数组,每个索引一个

Builder --->Loader &m_loader
             --->ddl::Context &m_ctx

ddl::Context --->Cursor  m_cursor 



Parallel_cursor::Cursor

 private:
  using Heaps = std::vector<mem_heap_t *, ut::allocator<mem_heap_t *>>;

  /** If true then no more rows to scan. */
  bool m_eof{};

  /** Heap per thread. */
  Heaps m_heaps{};

  /** Index to iterate over. */
  dict_index_t *m_index{};

  /** true if scan should be in single threaded mode. */
  bool m_single_threaded_mode{};

Cursor

  /** DDL context. */
  ddl::Context &m_ctx;

  /** Scoped heap to use for rows. */
  Scoped_heap m_row_heap{};

  /** Scoped heap to use for tuple instances. */
  Scoped_heap m_tuple_heap{};

  /** Previous fields. */
  dfield_t *m_prev_fields{};


    
    
Builder::bulk_add_row


数据结构


--- Builder 类
ddl::Builder
   enum class State : uint8_t {
    /** Initial phase. */
    INIT,

    /** Collect the rows for the index to build. */
    ADD,

    /** Setup the merge sort and add the tasks to the task queue. */
    SETUP_SORT,

    /** Sort the collected rows, if required. The builder moves to state
    BTREE_BUILD after all sort tasks are completed successfully or there
    was an error during the sort phase. */
    SORT,

    /** Build the btree. */
    BTREE_BUILD,

    /** FTS sort and build, this is done in one "step" */
    FTS_SORT_AND_BUILD,

    /** Finish the loading of the index. */
    FINISH,

    /** Stop on success. */
    STOP,

    /** Stop on error. */
    ERROR
  };
  
  /** Buffer ID. */
  size_t m_id{};

  /** Initial phase. */
  std::atomic<State> m_state{State::INIT};

  /** DDL Context. */
  ddl::Context &m_ctx;

  /** Loader that owns the instance. */
  Loader &m_loader;

  /** Index to create (if not FTS index). */
  dict_index_t *m_index{};

  /** Temporary file path. */
  const char *m_tmpdir{};

  /** Per thread context. */
  Thread_ctxs m_thread_ctxs{};

  /** For tracking duplicates. */
  dfield_t *m_prev_fields{};

  /** For collecting duplicate entries (error reporting). */
  Dup m_clust_dup{};

  /** Scoped virtual column heap. */
  Scoped_heap m_v_heap{};

  /** Scoped conversion heap. */
  Scoped_heap m_conv_heap{};

  /** The index to be built, FTS or non-FTS. */
  dict_index_t *m_sort_index{};

  /** Number of active sort tasks. */
  std::atomic<size_t> m_n_sort_tasks{};

  /** Cluster index bulk load instance to use, direct insert without
  a file sort. */
  Btree_load *m_btr_load{};

  /** Stage per builder. */
  Alter_stage *m_local_stage{};
  


---   Thread_ctx 类
  struct Thread_ctx {
    /** Constructor.
    @param[in] id               Thread state ID.
    @param[in,out] key_buffer   Buffer for building the target index. Note, the
                                thread state will own the key buffer and is
                                responsible for deleting it. */
    explicit Thread_ctx(size_t id, Key_sort_buffer *key_buffer) noexcept;

    /** Destructor. */
    ~Thread_ctx() noexcept;

    /** Thread ID. */
    size_t m_id{};

    /** Key sort buffer. */
    Key_sort_buffer *m_key_buffer{}; //复杂的类

    /** Total number of records added to the key sort buffer. */
    size_t m_n_recs{};

    /** Merge file handle. */
    ddl::file_t m_file{};

    /** Buffer to use for file writes. */
    ut::unique_ptr_aligned<byte[]> m_aligned_buffer{};

    /** Buffer to use for file writes. */
    IO_buffer m_io_buffer;//using IO_buffer = std::pair<byte *, os_offset_t>;

    /** Record list starting offset in the output file. */
    Merge_offsets m_offsets{}; //using Merge_offsets = std::deque<os_offset_t, ut::allocator<os_offset_t>>

    /** For spatial/Rtree rows handling. */
    RTree_inserter *m_rtree_inserter{}; //R树不考虑
  }
  
  using Allocator = ut::allocator<Thread_ctx *>;  //内存分配器
  using Thread_ctxs = std::vector<Thread_ctx *, Allocator>;
  
---  Key_sort_buffer  类
 
Key_sort_buffer : private ut::Non_copyable


  using DTuple = dfield_t *;
  using DTuples = std::vector<DTuple, ut::allocator<DTuple>>;

  /** Memory heap where allocated */
  mem_heap_t *m_heap{};

  /** The index the tuples belong to */
  dict_index_t *m_index{};

  /** Total amount of data bytes */
  size_t m_total_size{};

  /** Number of data tuples */
  size_t m_n_tuples{};

  /** Maximum number of data tuples */
  size_t m_max_tuples{};

  /** Array of data tuples */
  DTuples m_dtuples{};

  /** Buffer size. */
  size_t m_buffer_size{};
  
  
--- Copy_ctx

  /** Row to copy. */
  const Row &m_row;

  /** MySQL table definition. */
  TABLE *m_my_table{};

  /** Number of columns to copy. */
  size_t m_n_fields{};

  /** Number of multivalue rows to add. */
  size_t m_n_mv_rows_to_add{};

  /** For storing multi value data. */
  const multi_value_data *m_mv{};

  /** Number of rows added or UNIV_NO_INDEX_VALUE if this is a multi-value
  index and current row has nothing valid to be indexed. */
  size_t m_n_rows_added{};

  /** Number of bytes copied. */
  size_t m_data_size{};

  /** Number of extra bytes used. */
  size_t m_extra_size{};

  /** Number of rows added during copy. */
  size_t m_n_recs{};

  /** ID of the current thread. */
  size_t m_thread_id{std::numeric_limits<size_t>::max()};
  
  
  
/** Index field definition */
struct Index_field {
  /** Column offset */
  size_t m_col_no{};

  /** Column prefix length, or 0 if indexing the whole column */
  size_t m_prefix_len{};

  /** Whether this is a virtual column */
  bool m_is_v_col{};

  /** Whether it has multi-value */
  bool m_is_multi_value{};

  /** true=ASC, false=DESC */
  bool m_is_ascending{};
};

/** Definition of an index being created */
struct Index_defn {
  /** Index name */
  const char *m_name{};

  /** Whether the table is rebuilt */
  bool m_rebuild{};

  /** 0, DICT_UNIQUE, or DICT_CLUSTERED */
  size_t m_ind_type{};

  /** MySQL key number, or ULINT_UNDEFINED if none */
  size_t m_key_number{ULINT_UNDEFINED};

  /** Number of fields in index */
  size_t m_n_fields{};

  /** Field definitions */
  Index_field *m_fields{};

  /** Fulltext parser plugin */
  st_mysql_ftparser *m_parser{};

  /** true if it's ngram parser */
  bool m_is_ngram{};

  /** true if we want to check SRID while inserting to index */
  bool m_srid_is_valid{};

  /** SRID obtained from dd column */
  uint32_t m_srid{};
};


/** Structure for reporting duplicate records. */
struct Dup {
  /** Report a duplicate key.
  @param[in] entry              For reporting duplicate key. */
  void report(const dfield_t *entry) noexcept;

  /** Report a duplicate key.
  @param[in] entry              For reporting duplicate key.
  @param[in] offsets            Row offsets */
  void report(const mrec_t *entry, const ulint *offsets) noexcept;

  /** @return true if no duplicates reported yet. */
  [[nodiscard]] bool empty() const noexcept { return m_n_dup == 0; }

  /** Index being sorted */
  dict_index_t *m_index{};

  /** MySQL table object */
  TABLE *m_table{};

  /** Mapping of column numbers in table to the rebuilt table
  (index->table), or NULL if not rebuilding table */
  const ulint *m_col_map{};

  /** Number of duplicates */
  size_t m_n_dup{};
};



/** DDL context/configuration. */ 
struct Context {
  /** Full text search context information and state. */
  struct FTS {
    /** Document ID sequence */
    struct Sequence {
  

      /** Current document ID. */
      doc_id_t m_doc_id{};
    };

    /** FTS index. */
    dict_index_t *m_index{};

    /** Maximum number of FTS parser and sort threads to use. */
    const size_t m_n_parser_threads{};

    /** Document ID sequence generator. */
    Sequence *m_doc_id{};

    /** FTS instance. */
    ddl::FTS *m_ptr{};
  };

  /** Scan sort and IO buffer size. */
  using Scan_buffer_size = std::pair<size_t, size_t>;


 private:
  using Key_numbers = std::vector<size_t, ut::allocator<size_t>>;
  using Indexes = std::vector<dict_index_t *, ut::allocator<dict_index_t *>>;

  /** Common error code for all index builders running in parallel. */
  std::atomic<dberr_t> m_err{DB_SUCCESS};

  /** Index where the error occurred. */
  size_t m_err_key_number{std::numeric_limits<size_t>::max()};

  /** Transaction covering the index build. */
  trx_t *m_trx{};

  /** The FTS builder. There is one FTS per table. */
  FTS m_fts;

  /** Source table, read rows from this table. */
  dict_table_t *m_old_table{};

  /** Table where indexes are created; identical to old_table unless creating
  a PRIMARY KEY. */
  dict_table_t *m_new_table{};

  /** True if creating index online. Non-online implies that we have an
  S latch on the table, therefore there can't be concurrent updates to
  the table while we are executing the DDL. We don't log the changes to
  the row log. */
  bool m_online{};

  /** Indexes to be created. */
  Indexes m_indexes{};

  /** MySQL key numbers. */
  Key_numbers m_key_numbers{};

  /** MySQL table for reporting errors/warnings. */
  TABLE *m_table{};

  /** Default value for added columns or null. */
  const dtuple_t *m_add_cols{};

  /** Mapping of old column numbers to new ones, or nullptr if none
  were added. */
  const ulint *m_col_map{};

  /** Number of added AUTO_INCREMENT columns, or ULINT_UNDEFINED if
  none added. */
  size_t m_add_autoinc{ULINT_UNDEFINED};

  /** Autoinc sequence. */
  ddl::Sequence &m_sequence;

  /** Performance schema accounting object, used by ALTER TABLE.
  stage->begin_phase_read_pk() will be called at the beginning of
  this function and it will be passed to other functions for further
  accounting. */
  Alter_stage *m_stage{};

  /** New virtual columns added along with indexes */
  const dict_add_v_col_t *m_add_v{};

  /** MySQL table used to evaluate virtual column value, see
  innobase_get_computed_value(). */
  TABLE *m_eval_table{};

  /** Skip the sorting phase if true. */
  bool m_skip_pk_sort{};

  /** Non null columns. */
  std::vector<size_t, ut::allocator<size_t>> m_nonnull{};

  /** Number of unique columns in the key. */
  size_t m_n_uniq{};

  /** true if need flush observer. */
  bool m_need_observer{};

  /** Cursor for reading the cluster index. */
  Cursor *m_cursor{};

  /** Number of bytes used. */
  size_t m_n_allocated{};

  /** Maximum number of bytes to use. */
  const size_t m_max_buffer_size{};

  /** Maximum number of threads to use. We don't do a parallel scan of the
  clustered index when FTS and/or virtual columns are involved. The build
  phase is parallel though. */
  const size_t m_max_threads{};

  /** For parallel access to the autoincrement generator. */
  ib_mutex_t m_autoinc_mutex;

  /** Heap for copies of m_add_cols. */
  mem_heap_t *m_dtuple_heap{};

  friend struct Row;
  friend class Loader;
  friend struct Cursor;
  friend struct Builder;
  friend struct ddl::FTS;
  friend struct Load_cursor;
  friend struct Btree_cursor;
  friend struct Merge_file_sort;
  friend struct Parallel_cursor;
};

}  // namespace ddl



class Loader {
 public:
  /** Builder task. */
  struct Task {

   private:
    /** Builder instance. */
    Builder *m_builder{};

    /** Thread state index. */
    size_t m_thread_id{std::numeric_limits<size_t>::max()};

    friend class Loader;
  };

 private:
  /** DDL context, shared by the loader threads. */
  ddl::Context &m_ctx;

  /** If true then use parallel scan and index build. */
  bool m_parallel{};

  /** Sort buffer size. */
  size_t m_sort_buffer_size{};

  /** IO buffer size. */
  size_t m_io_buffer_size{};

  /** Index builders. */
  Builders m_builders{};

  /** Task queue. */
  Task_queue *m_taskq{};
};




loader --->ddl::Context &m_ctx; DDL上下文结构
           ---->Builders m_builders  Builder数组,每个索引一个

Builder --->Loader &m_loader
             --->ddl::Context &m_ctx
             
             




class Parallel_reader { 
 public:
  /** Maximum value for innodb-parallel-read-threads. */
  constexpr static size_t MAX_THREADS{256};

  /** Maximum value for reserved parallel read threads for data load so that
  at least this many threads are always available for data load. */
  constexpr static size_t MAX_RESERVED_THREADS{16};

  /** Maximum value for at most number of parallel read threads that can be
  spawned. */
  constexpr static size_t MAX_TOTAL_THREADS{MAX_THREADS + MAX_RESERVED_THREADS};

  using Links = std::vector<page_no_t, ut::allocator<page_no_t>>;

  // Forward declaration.  这类会用到这些类
  class Ctx;
  class Scan_ctx;
  struct Thread_ctx;

  /** Scan state. */
  enum class State : uint8_t {
    /** Unknown state. */
    UNKNOWN,

    /** Start/Finish thread state. */
    THREAD,

    /** Start/Finish Ctx state. */
    CTX,

    /** Start/Finish page read. */
    PAGE
  };

  /** Callback to initialise callers state. */
  using Start = std::function<dberr_t(Thread_ctx *thread_ctx)>; //函数指针

  /** Callback to finalise callers state. */
  using Finish = std::function<dberr_t(Thread_ctx *thread_ctx)>; //函数指针

  /** Callback to process the rows. */
  using F = std::function<dberr_t(const Ctx *)>; //函数指针

  /** Specifies the range from where to start the scan and where to end it. */
  struct Scan_range {

    /** Start of the scan, can be nullptr for -infinity. */
    const dtuple_t *m_start{};

    /** End of the scan, can be null for +infinity. */
    const dtuple_t *m_end{};

    /** Convert the instance to a string representation. */
    [[nodiscard]] std::string to_string() const;
  };

  /** Scan (Scan_ctx) configuration. */
  struct Config {

    /** Range to scan. */
    const Scan_range m_scan_range;

    /** (Cluster) Index in table to scan. */
    dict_index_t *m_index{};

    /** Row format of table. */
    const bool m_is_compact{};

    /** Tablespace page size. */
    const page_size_t m_page_size;

    /** Btree level from which records need to be read. */
    size_t m_read_level{0};

    /** Partition id if the index to be scanned belongs to a partitioned table,
    else std::numeric_limits<size_t>::max(). */
    size_t m_partition_id{std::numeric_limits<size_t>::max()};
  };

  /** Thread related context information. */
  struct Thread_ctx {
   
    /** Thread ID. */
    size_t m_thread_id{std::numeric_limits<size_t>::max()};

    /** Callback information related to the thread.
    @note Needs to be created and destroyed by the callback itself. */
    void *m_callback_ctx{};

    /** BLOB heap per thread. */
    mem_heap_t *m_blob_heap{};

    /** Worker thread state. */
    State m_state{State::UNKNOWN};

    /** Current persistent cursor. */
    PCursor *m_pcursor{}; //持久化游标

  };



  /** Add an execution context to the run queue.
  @param[in] ctx                Execution context to add to the queue. */
  void enqueue(std::shared_ptr<Ctx> ctx);

  /** Fetch the next job execute.
  @return job to execute or nullptr. */
  [[nodiscard]] std::shared_ptr<Ctx> dequeue();


 private:
  // clang-format off
  using Ctxs =
      std::list<std::shared_ptr<Ctx>,
                ut::allocator<std::shared_ptr<Ctx>>>;

  using Scan_ctxs =
      std::list<std::shared_ptr<Scan_ctx>,
                ut::allocator<std::shared_ptr<Scan_ctx>>>;

  // clang-format on

  /** Maximum number of worker threads to use. */
  size_t m_max_threads{};

  /** Number of worker threads that will be spawned. */
  size_t m_n_threads{0};

  /** Mutex protecting m_ctxs. */
  mutable ib_mutex_t m_mutex;

  /** Contexts that must be executed. */
  Ctxs m_ctxs{};

  /** Scan contexts. */
  Scan_ctxs m_scan_f{};

  /** For signalling worker threads about events. */
  os_event_t m_event{};

  /** Value returned by previous call of os_event_reset() on m_event. */
  uint64_t m_sig_count;

  /** Counter for allocating scan context IDs. */
  size_t m_scan_ctx_id{};

  /** Context ID. Monotonically increasing ID. */
  std::atomic_size_t m_ctx_id{};

  /** Total tasks executed so far. */
  std::atomic_size_t m_n_completed{};

  /** Callback at start (before processing any rows). */
  Start m_start_callback{};

  /** Callback at end (adter processing all rows). */
  Finish m_finish_callback{};

  /** Error during parallel read. */
  std::atomic<dberr_t> m_err{DB_SUCCESS};

  /** List of threads used for paralle_read purpose. */
  std::vector<IB_thread, ut::allocator<IB_thread>> m_parallel_read_threads;

  /** Number of threads currently doing parallel reads. */
  static std::atomic_size_t s_active_threads;

  /** If the caller wants to wait for the parallel_read to finish it's run */
  bool m_sync;

  /** Context information related to each parallel reader thread. */
  std::vector<Thread_ctx *, ut::allocator<Thread_ctx *>> m_thread_ctxs;
}







--------------------purge


trx_purge_attach_undo_recs

  while (n_pages_handled < batch_size) {
    /* Track the max {trx_id, undo_no} for truncating the
    UNDO logs once we have purged the records. */

    if (trx_purge_check_limit()) {
      purge_sys->limit = purge_sys->iter;
    }

    purge_node_t::rec_t rec;

    /* Fetch the next record, and advance the purge_sys->iter. */
    rec.undo_rec = trx_purge_fetch_next_rec(&rec.modifier_trx_id, &rec.roll_ptr,
                                            &n_pages_handled, heap);

    if (rec.undo_rec == &trx_purge_ignore_rec) {
      continue;

    } else if (rec.undo_rec == nullptr) {
      break;
    }

    purge_groups.add(rec); //记录加入
  }
  
  

row_purge_step

  if (node->recs != nullptr && !node->recs->empty()) { //循环需要清理的
    purge_node_t::rec_t rec;

    rec = node->recs->front();
    node->recs->pop_front();

    node->roll_ptr = rec.roll_ptr;
    node->modifier_trx_id = rec.modifier_trx_id;

    row_purge(node, rec.undo_rec, thr);
  
  
  
  
  
  
  
  

               session 2                                                                             session 1
select * from testv1;  
不会清理oldest read view 以上的 deflag 标记
活跃事物当然也不会清理。
但是老的del flag记录的row 可能还在清理,
因此等待查看一下 show engine
Purge done for trx's n:o < 561475 undo n:o < 0 
state: running but idle

确认已经空闲                                                              

                                                                                                             执行DDL




for (auto builder : builders) { //循环每个索引  
      const auto err = builder->add_row(*this, row, thread_id, [&]() {
      
      循环每个索引
      
      
 ---- 

mysql_alter_table
  ->ha_innobase::check_if_supported_inplace_alter
    ->innobase_support_instant
    
  ->mysql_inplace_alter_table
    ->handler::ha_prepare_inplace_alter_table
      ->ha_innobase::prepare_inplace_alter_table
         ->ha_innobase::prepare_inplace_alter_table_impl<dd::Table>  handler0alter.cc:5499
           ->innobase_support_instant
           
    ->handler::ha_inplace_alter_table
      ->ha_innobase::inplace_alter_table
        ->ha_innobase::inplace_alter_table_impl<dd::Table>
           ->  if (!(ha_alter_info->handler_flags & INNOBASE_ALTER_DATA) || is_instant(ha_alter_info)) { //是否使用instant 方式
          
  ->copy_data_between_tables

---版本大于64

Breakpoint 25, ha_innobase::check_if_supported_inplace_alter (this=0x7fffc004fbd0, altered_table=0x7fffc004e990, ha_alter_info=0x7fff907e96e0) at /pxc/mysql-8.0.36/storage/innobase/handler/handler0alter.cc:1064
1064              break;
(gdb) p m_prebuilt->table->current_row_version
$210 = 64
 

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容