author:sufei
源码版本:5.7.26
一、MySQL并行复制过程的发展
- 库间并发
理论依据: 一个数据库实例内可能会有多个库(schema),不同的库之间没有什么依赖关系,所以在slave那边为每一个库(schema)单独起一个SQL线程,这样就能通过多线程并行复制的方式来提高主从复制的效率。
缺点: 实际业务基本都是一个库,所以这种库间并发就没什么作用了;也就是说这个方式的适用场景比较少
- 组提交
理论依据: 如果多个事务他们能在同一时间内提交,这个就间接说明了这个几个事务锁上是没有冲突的,也是就说他们各自持有不同的锁,互不影响;逻辑上我们几个事务看一个组,在slave以“组”为单位分配给SQL线程执行,这样多个SQL线程就可以并行跑了;而且不在以库为并行的粒度,效果上要比“库间并发”要好一些
缺点: 必须是在主上并行提交的事务才能在从上并行回放,如果主上并发压力不大,那么就无法享受到并行复制带来的好处。无论如何,从库上并行回放的速度还是取决于主库上并行提交的情况。
- WriteSet
理论依据: 在开发Group Replication中,为了支持多主写入,通过Paxso协议在多个MySQL节点间分发Binlog,然后Certify阶段在使用了WriteSet来决定Binlog中的事务是否有冲突,同时,在写入RelayLog 时会将没有冲突的事务的 last_committed 值设置为相同的值。
## create a group replication cluster.
> STOP GROUP_REPLICATION; START GROUP_REPLICATION;
Query OK, 0 rows affected (9.10 sec)
## All the next commands on the primary member of the group:
> CREATE DATABASE test_ws_mgr ;
Query OK, 1 row affected (0.01 sec)
> CREATE TABLE test_ws_mgr.test ( id int primary key auto_increment, str varchar(64) not null );
Query OK, 1 row affected (0.01 sec)
> INSERT INTO test_ws_mgr.test(`str`) VALUES ("a");
Query OK, 1 row affected (0.01 sec)
> INSERT INTO test_ws_mgr.test(`str`) VALUES ("b");
Query OK, 1 row affected (0.01 sec)
> INSERT INTO test_ws_mgr.test(`str`) VALUES ("c");
Query OK, 1 row affected (0.01 sec)
在一个MGR 集群中创建了一个数据库和一个INNODB表,并插入了三条记录。这个时候,如何查询Primary 节点上的Binlog,可能会得到如下结果
# mysqlbinlog mysql-bin.N | grep last_ | sed -e 's/server id.*last/[...] last/' -e 's/.rbr_only.*/ [...]/'
#180106 19:31:59 [...] last_committed=0 sequence_number=1 [...] -- CREATE DB
#180106 19:32:02 [...] last_committed=1 sequence_number=2 [...] -- CREATE TB
#180106 19:32:05 [...] last_committed=2 sequence_number=3 [...] -- INSERT a
#180106 19:32:08 [...] last_committed=3 sequence_number=4 [...] -- INSERT b
#180106 19:32:11 [...] last_committed=4 sequence_number=5 [...] -- INSERT c
可以看到,由于是在一个Session中,这些操作按着串行的顺序有着不同的 last_committed , 正常情况下,这些BinlogEvent应该在从机上同样以串行的方式回放。我们看一下在MGR集群中的relaylog 情况。
# mysqlbinlog mysql-relay.N | grep -e last_ | sed -e 's/server id.*last/[...] last/' -e 's/.rbr_only.*/ [...]/'
#180106 19:31:36 [...] last_committed=0 sequence_number=0 [...]
#180106 19:31:36 [...] last_committed=1 sequence_number=2 [...] -- CREATE DB
#180106 19:31:36 [...] last_committed=2 sequence_number=3 [...] -- CREATE TB
#180106 19:31:36 [...] last_committed=3 sequence_number=4 [...] -- INSERT a
#180106 19:31:36 [...] last_committed=3 sequence_number=5 [...] -- INSERT b
#180106 19:31:36 [...] last_committed=3 sequence_number=6 [...] -- INSERT c
在 Secondary 节点的 RelayLog 中, 这些事务有着相同的 last_committed 值,也就是说这些事务在MGR集群中,回放的时候可以以并行的方式回放。
正是 WriteSet 技术检测可以不同事务之间是否存在写冲突,并重规划了事务的并行回放,基于该原理,应用到主从复制的架构中,这使得从机上的并发程度不再依赖于主机,而是基于事务本身的更新冲突来确定并行关系。
二、引入参数讲解
binlog_transaction_depandency_tracking :用于控制如何决定事务的依赖关系。该值有三个选项
- COMMIT_ORDERE(默认) : 表示继续使用5.7原有的基于组提交的方式决定事务的依赖关系;
- WRITESET :表示使用写集合来决定事务的依赖关系;
- WRITESET_SESSION :表示使用 WriteSet 来决定事务的依赖关系,但是同一个Session内的事务不会有相同的 last_committed 值。
binlog_transaction_dependency_history_size : WRITESET 是一个 hash 数组,大小由参数 binlog_transaction_dependency_history_size 决定
WriteSet 是通过检测两个事务是否更新了相同的记录来判断事务能否并行回放的,因此需要在运行时保存已经提交的事务信息以记录历史事务更新了哪些行。记录历史事务的参数为 binlog_transaction_dependency_history_size . 该值越大可以记录更多的已经提交的事务信息,不过需要注意的是,这个值并非指事务大小,而是指追踪的事务更新信息的数量。
transaction_write_set_extraction :控制检测事务依赖关系时采用的HASH算法,有三个取值 OFF| XXHASH64 | MURMUR32, 如果 binlog_transaction_depandency_tracking 取值为 WRITESET 或 WRITESET_SESSION, 那么该值取值不能为OFF,且不能变更。
三、源码分析
在代码实现上,MySQL采用一个 vector<uint64> write_set的变量存储已经提交的事务的HASH值,所有已经提交的事务的所修改的主键和非空的 Unique Key 的值经过HASH后与该vector中的值对比,以判断当前提交的事务是否与已经提交的事务更新了同一行,并以此确定依赖关系。
3.1 writeset生成
写入session writeset的入口是 rpl_write_set_handler.cc:add_pke() ,而且函数add_pke被binlog_log_row调用,也就是每个更新,插入,删除都会调用add_pke函数,然后更新session writeset集合。writeset的写入是每一行都形成一组uint64 hash,并将其添加到session writeset中;hash字符串由更新行的 index,db,table,value 组成。
下面add_pke逻辑如下:
如果表中存在索引:
将数据库名,表名信息写入临时变量
循环扫描表中每个索引:
如果不是唯一索引:
退出本次循环继续循环。
循环两种生成数据的方式(MySQL格式和字符串格式,下面之分析字符串格式):
将索引名字写入到pke中。
将临时变量信息写入到pke中。
循环扫描索引中的每一个字段:
将每一个字段的信息写入到pke中。
如果字段扫描完成:
将pke生成hash值并且写入到写集合中。
3.1.1pke值的构成
如下表:
mysql> show create table sufei\G
*************************** 1. row ***************************
Table: sufei
Create Table: CREATE TABLE `sufei` (
`id1` int(11) DEFAULT NULL,
`id2` int(11) DEFAULT NULL,
`id3` int(11) NOT NULL,
PRIMARY KEY (`id3`),
UNIQUE KEY `id1` (`id1`),
KEY `id2` (`id2`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.00 sec)
写入一行数据:
mysql> insert into sufei values(1,21,31);
GDB可以看到该语句产生了两条pke值,分别为:
## 主键索引
(gdb) p pke
$2 = {
static npos = 18446744073709551615,
_M_dataplus = {
<std::allocator<char>> = {
<__gnu_cxx::new_allocator<char>> = {<No data fields>}, <No data fields>},
members of std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_Alloc_hider:
_M_p = 0x7f321c9408c0 "PRIMARY½test½4sufei½531½2" // ½为分隔符
},
_M_string_length = 29,
{
_M_local_buf = "\300\003", '\000' <repeats 13 times>,
_M_allocated_capacity = 960
}
}
可以看到其pke的值为:"PRIMARY½test½4sufei½531½2",构成如下:
PRIMARY + 分隔符 + 库名 + 分隔符 + 库名长度 + 表名 + 分隔符 + 表名长度 + 主键数据值 + 分隔符 + 主键值长度
## 唯一键索引
(gdb) p pke
$3 = {
static npos = 18446744073709551615,
_M_dataplus = {
<std::allocator<char>> = {
<__gnu_cxx::new_allocator<char>> = {<No data fields>}, <No data fields>},
members of std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_Alloc_hider:
_M_p = 0x7f321c9408c0 "id1½test½4sufei½51½1"
},
_M_string_length = 24,
{
_M_local_buf = "\300\003", '\000' <repeats 13 times>,
_M_allocated_capacity = 960
}
}
唯一键pke值为"id1½test½4sufei½51½1",构成如下:
非唯一索引名称 + 分隔符 + 库名 + 分隔符 + 库名长度 + 表名 + 分隔符 + 表名长度 + 索引字段1数值 + 分隔符 +索引字段1长度 [ + 索引字2段数值 + 分隔符 + 索引字段2长度 ..... ]
从上面gdb调试可以看出,上面插入一行记录,实际上在writeset中,插入了两条pke的hash值。所以为了比较两事务之间的冲突,相关操作行的hash值是由主键以及相应的唯一键构成,通过上述pke的构建规则来实现冲突检测。
3.1.2 源码讲解
void add_pke(TABLE *table, THD *thd)
{
// 获得本会话的writeset
Rpl_transaction_write_set_ctx* ws_ctx=
thd->get_transaction()->get_transaction_write_set_ctx();
int writeset_hashes_added= 0;
// 如果存在索引,并且有主键才进行add_pke操作,主键是有着极其重要的地位,是判断是否冲突的重要依据
if(table->key_info && (table->s->primary_key < MAX_KEY))
{
char value_length_buffer[VALUE_LENGTH_BUFFER_SIZE];
char* value_length= NULL;
std::string pke_schema_table;
pke_schema_table.reserve(NAME_LEN * 3);
pke_schema_table.append(HASH_STRING_SEPARATOR);
pke_schema_table.append(table->s->db.str, table->s->db.length);
pke_schema_table.append(HASH_STRING_SEPARATOR);
value_length= my_safe_itoa(10, table->s->db.length, &value_length_buffer[VALUE_LENGTH_BUFFER_SIZE-1]);
pke_schema_table.append(value_length);
pke_schema_table.append(table->s->table_name.str, table->s->table_name.length);
pke_schema_table.append(HASH_STRING_SEPARATOR);
value_length= my_safe_itoa(10, table->s->table_name.length,
&value_length_buffer[VALUE_LENGTH_BUFFER_SIZE-1]);
pke_schema_table.append(value_length);
// 上面将数据库名,表名信息写入临时变量pke_schema_table中
// 依次扫描每个索引
for (uint key_number=0; key_number < table->s->keys; key_number++)
{
// Skip non unique.如果是非唯一索引,跳过继续下一个索引
if (!((table->key_info[key_number].flags & (HA_NOSAME )) == HA_NOSAME))
continue;
for (int collation_conversion_algorithm= COLLATION_CONVERSION_ALGORITHM;
collation_conversion_algorithm >= 0;
collation_conversion_algorithm--)
{
pke.clear();
pke.append(table->key_info[key_number].name);// 写入索引名
pke.append(pke_schema_table);// 写入 库表名临时变量
uint i= 0;
// 循环变量每个索引字段
for (/*empty*/; i < table->key_info[key_number].user_defined_key_parts; i++)
{
// read the primary key field values in str.
int index= table->key_info[key_number].key_part[i].fieldnr;
size_t length= 0;
/* Ignore if the value is NULL. 如果该字段为空值,则跳过*/
if (table->field[index-1]->is_null())
break;
// convert using collation support conversion algorithm
if (COLLATION_CONVERSION_ALGORITHM == collation_conversion_algorithm)
{
const CHARSET_INFO* cs= table->field[index-1]->charset();
length= cs->coll->strnxfrmlen(cs,
table->field[index-1]->pack_length());
}
// convert using without collation support algorithm
else
{
table->field[index-1]->val_str(&row_data);
length= row_data.length();
}
if (pk_value_size < length+1)
{
pk_value_size= length+1;
pk_value= (char*) my_realloc(key_memory_write_set_extraction,
pk_value, pk_value_size,
MYF(MY_ZEROFILL));
}
// convert using collation support conversion algorithm
if (COLLATION_CONVERSION_ALGORITHM == collation_conversion_algorithm)
{
/*
convert to normalized string and store so that it can be
sorted using binary comparison functions like memcmp.
将索引值写入
*/
table->field[index-1]->make_sort_key((uchar*)pk_value, length);
pk_value[length]= 0;
}
// convert using without collation support algorithm
else
{
strmake(pk_value, row_data.c_ptr_safe(), length);
}
pke.append(pk_value, length);// 将主键值写入
pke.append(HASH_STRING_SEPARATOR); // 分隔符写入
value_length= my_safe_itoa(10, length,
&value_length_buffer[VALUE_LENGTH_BUFFER_SIZE-1]);
pke.append(value_length); // 值长度写入
}
/*
If any part of the key is NULL, ignore adding it to hash keys.
NULL cannot conflict with any value.
Eg: create table t1(i int primary key not null, j int, k int,
unique key (j, k));
insert into t1 values (1, 2, NULL);
insert into t1 values (2, 2, NULL); => this is allowed.
*/
// 如果所有的索引字段都扫描完成
if (i == table->key_info[key_number].user_defined_key_parts)
{
// generate_hash_pke函数 将pke值变为hash,然后存入session writeset
generate_hash_pke(pke, collation_conversion_algorithm, thd);
writeset_hashes_added++;
#ifndef DBUG_OFF
write_sets.push_back(pke);
#endif
}
else
{
/* This is impossible to happen in case of primary keys */
DBUG_ASSERT(key_number !=0);
}
}
}
// 记录外键相关信息,这里省略
if (!(thd->variables.option_bits & OPTION_NO_FOREIGN_KEY_CHECKS))
{}//这里省略
}
//如果没有表索引,则设置missing key
if (writeset_hashes_added == 0)
ws_ctx->set_has_missing_keys();
}
add_pke() 会记录每一条主键、唯一键、外键信息,并且会记录是否更新了无主键表,或者更新了外键关联的列。现在我们已经知道了,在binlog write row时,会通过调用add_pke函数,将修改行的信息(主键,唯一键,外键,每一个生产一个pke的hash值)写入session writeset集合中。
那如何使用该session writeset集合来达到冲突检测,从而控制从库并行回放呢?**
3.2 WriteSet使用
我们知道:在组提交过程中,只要事务的last_committed一致,则这些事务就可以在从库并行执行。那现在只要研究如何通过writeset来调整binlog写入的last_committed即可。
下面看一下在写入binlog时,是如何进行last_committed调整的。涉及的源代码的入口为MYSQL_BIN_LOG::write_gtid ,该函数是写入gtid事件的函数调用,其中last_committed的初始化函数在write_gtid函数入口处,其获取last_committed是通过调用 m_dependency_tracker.get_dependency()来实现
void Transaction_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
sequence_number= commit_parent= 0;
switch(m_opt_tracking_mode) // 也就是上面系统变量binlog_transaction_depandency_tracking
{
// COMMIT_ORDERE 只调用 m_commit_order.get_dependency(),与组提交一直
case DEPENDENCY_TRACKING_COMMIT_ORDER:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
break;
// WRITESET 在COMMIT_ORDERE的基础上再调用m_writeset.get_dependency()
case DEPENDENCY_TRACKING_WRITESET:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
break;
// WRITESET_SESSION 在WRITESET的基础上再调用m_writeset_session.get_dependency
case DEPENDENCY_TRACKING_WRITESET_SESSION:
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
break;
default:
DBUG_ASSERT(0); // blow up on debug
/*
Fallback to commit order on production builds.
*/
m_commit_order.get_dependency(thd, sequence_number, commit_parent);
}
}
void Writeset_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
// 获取当前session writeset
Rpl_transaction_write_set_ctx *write_set_ctx=
thd->get_transaction()->get_transaction_write_set_ctx();
std::set<uint64> *writeset= write_set_ctx->get_write_set();
#ifndef DBUG_OFF
/* The writeset of an empty transaction must be empty. */
if (is_empty_transaction_in_binlog_cache(thd))
DBUG_ASSERT(writeset->size() == 0);
#endif
/*
检测本事务是否可以使用writeset,以下情况不能使用writeset
1、ddl语句
2、session writeset 的 hash 算法和 history writeset 不同
3、事务更新了被外键关联的字段
*/
bool can_use_writesets=
// empty writeset implies DDL or similar, except if there are missing keys
(writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
/*
The empty transactions do not need to clear the writeset history, since
they can be executed in parallel.
*/
is_empty_transaction_in_binlog_cache(thd)) &&
// hashing algorithm for the session must be the same as used by other rows in history
(global_system_variables.transaction_write_set_extraction ==
thd->variables.transaction_write_set_extraction) &&
// must not use foreign keys
!write_set_ctx->get_has_related_foreign_keys();
bool exceeds_capacity= false;
if (can_use_writesets)
{
// 判断 write_history 是否超过最大值
exceeds_capacity=
m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
/*
遍历 session 的 writeset,查找在 writeset_history 中的冲突行
如果冲突,则更新last_parent(last_parent 是临时变量,并不是 commit parent),并更新冲突行
如果没冲突,并且没超过最大值,则插入 write_history
*/
int64 last_parent= m_writeset_history_start; // 初始化为该组history的commit_parent
for (std::set<uint64>::iterator it= writeset->begin();
it != writeset->end(); ++it)
{
Writeset_history::iterator hst= m_writeset_history.find(*it);
if (hst != m_writeset_history.end()) // 冲突
{
if (hst->second > last_parent && hst->second < sequence_number)
last_parent= hst->second; //更新last_parent,找到冲突的最大的num
hst->second= sequence_number; //将冲突行更新为最新sequence_number
}
else
{
if (!exceeds_capacity)
m_writeset_history.insert(std::pair<uint64, int64>(*it, sequence_number));
}
}
/*
如果更新了没有主键的表,则不能更新 commit_parent,意思就是主键
但是因为要更新 writeset_history 所以不能直接设置 can_use_writesets,所以这将commit_parent设置回来
*/
if (!write_set_ctx->get_has_missing_keys())
{
commit_parent= std::min(last_parent, commit_parent);
}
}
// 如果 writeset_history 已满,或者不可以使用 WriteSet,则清空WriteSet
if (exceeds_capacity || !can_use_writesets)
{
m_writeset_history_start= sequence_number; //设置下一组writeset的commit_parent初始值
m_writeset_history.clear();
}
}
void Writeset_session_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent)
{
// 获取该会话的最大sequence_number
int64 session_parent= thd->rpl_thd_ctx.dependency_tracker_ctx().
get_last_session_sequence_number();
// 为保证同一会话事务不并行,调整commit_parent不小于session_parent
if (session_parent != 0 && session_parent < sequence_number)
commit_parent= std::max(commit_parent, session_parent);
// 更新session parent为最新的sequence_number
thd->rpl_thd_ctx.dependency_tracker_ctx().
set_last_session_sequence_number(sequence_number);
}