一: 概述 Clickhouse 插入数据
Clickhouse 插入数据过程
当需要在ReplicatedMergeTree中执行INSERT以写入数据时,即会进入INSERT核心流程,整个流程从上至下按照时间顺序进行。
NEXT-1: 创建副本
副本1:
create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic1')
partition by toYYYYMM(ctime)
order by id;
副本2:
create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic2')
partition by toYYYYMM(ctime)
order by id;
在创建的过程中,ReplicatedMergeTree会进行一些初始化操作。
-
根据zk_path初始化所有的ZooKeeper节点。
表的zk_path: ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated [alter_partition_version, block_numbers, blocks, columns, leader_election, log, metadata, mutations, nonincrement_block_numbers, part_moves_shard, pinned_part_uuids, quorum, replicas, temp, zero_copy_hdfs, zero_copy_s3]
在/replicas/节点下注册自己的副本实例replic1或replic2
replic1的zk path:
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1
[columns, flags, host, is_active, is_lost, log_pointer, max_processed_insert_time, metadata, metadata_version, min_unprocessed_insert_time, mutation_pointer, parts, queue]
- 启动监听任务,监听/log日志节点。
[zk: 10.178.19.198:2181(CONNECTED) 24] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000]
NEXT-2: 向第一个副本实例写入数据
1) 术语
Block数据块:insert依据max_insert_block_size的大小(默认1048576行)将数据切分成若干个Block数据块。因此Block数据块是数据写入的基本单元,并且具有写入的原子性和唯一性。
每个压缩数据块的体积,按照其压缩前的数据字节大小,被严格控制在64KB~1MB之间,上下限大小由min_compress_block_size(默认65536)
和max_compress_block_size(默认1048576)
参数指定。而每一个压缩数据块最终大小,则和一个index_granularity内实际的数据大小有关。
单个索引粒度间隔数据size < 64KB:如果单个索引粒度数据大小小于64KB,则继续获取下一个索引粒度的数据,一直到size >= 64KB,生成下一个压缩数据块。
单个索引粒度间隔数据 64KB <= size <= 1MB:如果单个索引粒度数据大小大于64KB,小于1MB,则直接生成下一个压缩数据块
-
单个索引粒度间隔数据 size > 1MB:如果单个索引粒度数据大小超过1MB,则先按照1MB大小截断并生成下一个压缩数据块,剩余数据按照这三个规则对应执行。这时就会出现一批数据生成多个压缩数据块的情况。
2) 插入数据
replic1写入如下数据:
insert into test_replicated values(1,now(),'zs'),(2,now(),'ls');
插入数据后如下zk会有如下数据:
1: /blocks
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks
[202201_5253379567567382390_63093905895384417]
get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks/202201_5253379567567382390_63093905895384417
202201_0_0_0
2: log
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000]
get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000000
format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact
NEXT-3: 第二个副本实例拉取Log日志
replic2副本会一直监听/log节点变化,当replic1推送log/log-0000000000后,replic2会触发日志的拉取任务并更新log_pointer
get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer
1
replic2 在拉取了LogEntry之后,它并不会直接执行,而是将其转为任务对象放至队列:
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/queue
queue-0000000001
NEXT-4: 第二个副本实例向其他副本发起下载请求
replic2基于/queue队列开始执行任务。当看到type类型为get的时候,ReplicatedMerge-Tree会明白此时在远端的其他副本中已经成功写入了数据分区,而自己需要同步这些数据。
replic2上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:
(1)从/replicas节点拿到所有的副本节点。
(2)遍历这些副本,选取其中一个。选取的副本需要拥有最大的log_pointer下标,并且/queue子节点数量最少。log_pointer下标最大,意味着该副本执行的日志最多,数据应该更加完整;而/queue最小,则意味着该副本目前的任务执行负担较小。
例如:
Sending request to http://replic1.clickhouse-headless.cluster.local:9009/?endpoint=DataPartsExchange……
NEXT-5 第一个副本实例响应数据下载
replic1的DataPartsExchange端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区202107_0_0_0基于DataPartsExchang的服务响应发送回replic1
NEXT-6 第二个副本实例下载数据并完成本地写入
首先将其写至临时目录
```
tmp_fetch_202107_1_1_0
```
待全部数据接收完成之后,重命名该目录
```
Renaming temporary part tmp_fetch_202107_1_1_0 to 202107_1_1_0.
```
二: Clickhouse副本同步过程
2.1 哪些操作会引发数据同步?
数据同步包括了元数据和内容数据的同步。在ClickHouse中,数据插入(INSERT),元数据变更(ALTER),内容数据变更(MUTATION),合并(MERGE)等操作都会引发数据同步。
其中MERGE
和MUTATION
操作 只能由主副本主导, 主副本选举过程如下:
/leader_election**:用于主副本的选举工作,主副本会主导`MERGE`和`MUTATION`操作(`ALTER DELETE`和`ALTER UPDATE`)。这些任务在主副本完成之后再借助ZooKeeper将消息事件分发至其他副本
主副本选举的方式是向/leader_election/插入子节点,第一个插入成功的副本 就是主副本
2.2: INSERT同步
clickhouse执行ReplicatedMergeTree表的插入操作时,会将数据以data part为单位写入到磁盘/内存里,每个data part写入完成后都会做一个commit操作, 这个commit操作会生成一个GET_PART类型的log entry并上传到zookeeper中供其他副本拉取同步。
由此可见,INSERT语句引发的是data part级别的内容数据的同步。
GET_PART类型的log entry 文件内容demo:
format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact
2.3 同步日志和副本队列(log, queue)
2.3.1 同步日志(log)
log位于zookeeper_path下面,用于保存所有副本间的数据同步日志(称为log entry),ClickHouse支持的LogEntry类型有:
笔记:
每插入一个数据 都会生成一个entity log:
[zk: localhost:2181(CONNECTED) 20] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000, log-0000000001]
原理: 在一个副本上执行insert into ...操作会上传一个GET_PART类型的log entry到log节点下。
zookeeper_path/log下的log entry在zookeeper上的znode命名规范是log-seqNum, 其中seqNum为创建sequential znode生成的自增序号,如log-0000000001。
log entity 下有什么东西:
[zk: localhost:2181(CONNECTED) 22] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000001
format version: 4
create_time: 2022-01-07 22:08:20
source replica: replic1 //数据源
block_id: 202201_4569856920166365182_7768759302129368290 // 分区的block_id
get
202201_1_1_0 //从那个part下载
part_type: Compact
2.3.2 副本队列 (queue)
queue位于replica_path下面,保存了当前副本需要同步的log entries. queue里面的log entries是从log中拉取的(由queue updating task 任务负责拉取),被所有副本都拉取过的log entry就可以从log中删除掉, 删除操作由后台清理线程执行。
replica_path/queue下的entry在zookeeper上的znode命名规范是queue-seqNum, 其中seqNum为创建sequential znode生成的自增序号,如queue-0000000001。
replica_path下还有一个log_pointer节点,保存了当前副本从log拉取到的最大log entry id + 1,即下一个需要拉取的log entry的id.
[zk:localhost:2181(CONNECTED) 11] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer //副本2
2
[zk:localhost:2181(CONNECTED) 12] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/log_pointer // 副本1
2
3 负责数据同步的tasks
每个副本节点都有一系列的负责数据同步的任务,这些任务会与zookeeper交互,获取最新的同步任务,然后在本地内存中维护任务队列并借助任务池调度执行,执行完成后会更新zookeer上的对应状态。下面我们逐一介绍各类同步任务。
3.1 queue updating task
queue updating task负责当前副本上同步日志(log entry)队列的更新。
ReplicatedMergeTree表对应的StoreageReplicatedMergeTree类中有一个queue_updating_task对象,它是由后台调度池(BackgroundSchedulePool)调度执行的任务,其执行的函数是
StorageReplicatedMergeTree::queueUpdatingTask()
queue updating task做的事情包括:
1. 从zookeeper拉取zookeeper_path/log下的所有log entries,并从replica_path/log_pointer获取当前副本的log_pointer (即下一个需要拉取的log entry的id)。
2. 根据log_pointer在log entries中定位到需要拉取的entries,如果log_pointer为空,则从log entries中最小的entry开始拉取。
3. 将需要拉取的log entries都同步到replica_path/queue下面,这里的同步就是以选定的log entries的内容为节点内容,一个一个地在replica_path/queue下创建新的sequential znode. 注意,这里创建的znode名称并不会和zookeeper_path/log下的一致。
4. 更新replica_path/log_pointer的内容为拉取到的log entries中最大的entry id + 1.
5. 将拉取到的log entries插入到内存中的同步任务队列里,即插入到ReplicatedMergeTreeQueue::queue。
6. 触发queue executing task(下文会介绍)执行。
这些步骤的实现细节请见源码ReplicatedMergeTreeQueue::pullLogsToQueue.
void StorageReplicatedMergeTree::queueUpdatingTask()
{
....
try
{
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
.....
}
catch (const Coordination::Exception & e)
{
....
}
....
}
3.2 ReplicatedMergeTreeQueue::queue 队列任务
StorageReplicatedMergeTree::queueUpdatingTask() 会将拉取到的log entries插入到内存中的同步任务队列里.
3.3 queue executing task
queue updating task把同步日志(log entries)拉取到本地内存后,我们需要一个任务去执行这些log entries. 这个任务就是queue executing task.
queue executing 最终调用执行的函数是 StorageReplicatedMergeTree::queueTask
下面介绍queueTask函数的主要执行步骤。
step1: 选择要处理的log entry
对于GET_PART类型的log entry,如果它生成的data part被某个正在执行的log entry的resulting data parts所包含,则当前entry此轮不被选择。
step2: 处理选中的log entry
选中log entry后,StorageReplicatedMergeTree::queueTask函数会调用ReplicatedMergeTreeQueue::processEntry对log entry进行处理。processEntry函数的第三个参数是用于执行log entry的函数,此处传入的是StorageReplicatedMergeTree::executeLogEntry
源码:
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
....
// 对正在执行的entry用selected变量来表示,方便以后处理
ReplicatedMergeTreeQueue::SelectedEntry selected;
try
{
// 1: 选择需要处理的entrys
selected = queue.selectEntryToProcess(merger_mutator, *this);
}
....
LogEntryPtr & entry = selected.first;
// 2: 如果没有需要处理的直接返回
if (!entry)
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
// 3: 有需要处理的执行以下代码
time_t prev_attempt_time = entry->last_attempt_time;
// 这里是真正的处理方法,并且传入了一个匿名方法
bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
{
try
{
// 4: 这里是真正的执行器executeLogEntry
return executeLogEntry(*entry_to_process);
}
....
});
....
}
3.4 StorageReplicatedMergeTree::executeLogEntry
每次queue executing task执行只会选择一个log entry进行处理。选中log entry后,StorageReplicatedMergeTree::queueTask函数会最终会调用StorageReplicatedMergeTree::executeLogEntry进行处理
每个log entiry的处理逻辑不一样, 本节对于GET_PART类型的log entry,处理流程如下:
step1: 判断目标data part在当前副本是否已经存在或被已存在的data part所包含,这里不仅会检查active data parts,也会检查处于MergeTreeDataPartState::PreCommitted状态的data parts。
step2: 判断目标data part在zookeeper的replica_path/parts节点下是否已经存在。
step3: 如果step1和step2的判断结果都是已存在,则直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功)。
step4: 否则,进一步判断目标data part是否是某次失败的write with quorum操作的resulting data part,判断方法是查看zookeeper上zookeeper_path/quorum/failed_parts节点下是否存在目标data part。如果存在,则和2.3一样,直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功)。
step5: 否则,从其他有目标data part的副本节点去拉取目标data part。拉取是调用StorageReplicatedMergeTree::executeFetch实现的。
以上过程对应源码如下:
bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
....
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::MUTATE_PART)
{
//1:判断目标data part在当前副本是否已经存在或被已存在的data part所包含,这里不仅会检查active data parts,也会检查处于MergeTreeDataPartState::PreCommitted状态的data parts
DataPartPtr existing_part = getPartIfExists(entry.new_part_name, {MergeTreeDataPartState::PreCommitted});
if (!existing_part)
// 2: getActiveContainingPart 判断分区是否已经 merge 到磁盘
existing_part = getActiveContainingPart(entry.new_part_name);
/**
3: 本地存在的话,判断目标data part在zookeeper的*replica_path/parts*节点下是否已经存在。
例如:
[zk: localhost:2181(CONNECTED) 59] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/parts
[202201_0_0_0, 202201_1_1_0]
*
**/
if (existing_part && getZooKeeper()->exists(replica_path + "/parts/" + existing_part->name))
{
....
// 4: 如果 part在当前副本是否已经存在或zk已经存在,则直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功
return true;
}
bool do_fetch = false;
switch (entry.type)
{
case LogEntry::ATTACH_PART:
/// We surely don't have this part locally as we've checked it before, so download it.
[[fallthrough]];
case LogEntry::GET_PART:
do_fetch = true;
break;
.....
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected log entry type: {}", static_cast<int>(entry.type));
}
// 5: 对于get类型part, 则直接调用 fetch
if (do_fetch)
// 实际调用的是executeFetch
return executeFetch (entry);
return true;
3.4 StorageReplicatedMergeTree::executeFetch
executeFetch方法是处理拉取part的拉取任务
源码如下:
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
/// 1: 查找是否有需要覆盖的part
String replica = findReplicaHavingCoveringPart(entry, true);
const auto storage_settings_ptr = getSettings();
// 2: 设置一些并行参数,判断replicated_max_parallel_fetches和
// replicated_max_parallel_fetches_for_table是否符合要求
static std::atomic_uint total_fetches {0};
// 判断是否超过拉取次数限制(storage级别)
if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
{
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++total_fetches;
SCOPE_EXIT({--total_fetches;});
// 判断是否超过拉取次数限制(table级别)
if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
{
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++current_table_fetches;
SCOPE_EXIT({--current_table_fetches;});
.....
try
{
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
// 3: 拉取part
if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
return false;
}
.....
}
...
return true;
}
Next1-StorageReplicatedMergeTree::fetchPart 方法
executeFetch 最终拉取part 调用的是fetchPart 方法, fetchPart 方法实现如下:
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
{
auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
...
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
/***
2022.01.05 16:29:06.940980 [ 4423 ] {} <Debug> hdp_teu_dpd_clickhousedb.hybrid_test_tongbu (4058c757-c516-4345-8058-c757c5166345): Fetching part 20220105_1_1_0 from /clickhouse/common_test_cluster/hdp_teu_dpd_clickhousedb/hybrid_test_tongbu/shard2/replicas/replic2
**/
...
std::function<MutableDataPartPtr()> get_part;
if (part_to_clone)
{
.....
}
else
{
// 1: 获取需要clone数据的副本地址
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
{
...
// 2: 这里的fetchPart主要就是构造HTTP参数及连接真正拉取数据
/**
参数 demo:
source_replica_path: zookeeper_path + "/replicas/" + replica
address.host:hostname1
port: 9009
interserver_scheme:
to_detached: false
part_name : 202107_1_1_0
**/
// 3: 最终调用的是 fetcher.fetchPart 方法
return fetcher.fetchPart(
part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
};
}
...
return true;
}
Next2-fetcher.fetchPart 代码如下(DataPartsExchange.cpp):
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
const String & interserver_scheme,
ThrottlerPtr throttler,
bool to_detached,
const String & tmp_prefix_,
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
bool try_zero_copy,
DiskPtr disk)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
const auto data_settings = data.getSettings();
// 1: 设置 uri
Poco::URI uri;
uri.setScheme(interserver_scheme);
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
{"compress", "false"}
});
Strings capability;
if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
{
/*******
2: 判断是如果是zero_copy 添加:
capability.push_back(DiskType::toString(disk->getType()));
细节略
***/
}
if (!capability.empty())
{
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
}
else
{
try_zero_copy = false;
}
Poco::Net::HTTPBasicCredentials creds{};
if (!user.empty())
{
creds.setUsername(user);
creds.setPassword(password);
}
PooledReadWriteBufferFromHTTP in{
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
data_settings->replicated_max_parallel_fetches_for_host
};
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
ReservationPtr reservation;
size_t sum_files_size = 0;
// 3: 获取服务端part目下文件的累计大小
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
readBinary(sum_files_size, in);
....
}
.....
bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
&& sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);
String part_type = "Wide";
// 4: 读取server端part的part_type
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
readStringBinary(part_type, in);
// 5: 读取server端part的part_uuid
UUID part_uuid = UUIDHelpers::Nil;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, in);
String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
if (!remote_fs_metadata.empty())
{
// 6: 如果是远程服务端走元数据同步逻辑,如s3或者 hdfs (相当于握手协议,双方都是zero-copy存储类型才可以走元数据同步逻辑)
try
{
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
}
....
}
auto storage_id = data.getStorageID();
// 7: 构建副本的part存放路径
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
auto entry = data.getContext()->getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
size_t projections = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
readBinary(projections, in);
MergeTreeData::DataPart::Checksums checksums;
8: 构建副本的part_type 触发不同的下载逻辑,一般是downloadPartToDisk
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
}
Next3-downloadPartToDisk:(写入本地磁盘的方法)
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler)
{
// 1: 定义临时目录前缀
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
.........
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
// 2: 构建下载后的存储路径 注意是写到临时目录
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
// 3: 如果本地磁盘下存在该路径,将以前的临时目录删除
if (disk->exists(part_download_path))
{
LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
fullPath(disk, part_download_path));
disk->removeRecursive(part_download_path);
}
// 4: 创建临时目录: 例如:/data00/clickhouse/store/85f/85f61e82-f5cc-429b-85f6-1e82f5cc229b/temp_20220103_0_5_1
disk->createDirectories(part_download_path);
.....
// 5: 开始下载 Download the base part
downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
....
}
// Next4-downloadBaseOrProjectionPartToDisk
void Fetcher::downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const String & part_download_path,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const
{
size_t files;
readBinary(files, in);
//1: 遍历服务端part目录下的文件
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
.....
//2: 创建out 目标
auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
HashingWriteBuffer hashing_out(*file_out);
//3: copy 写入
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
.....
// 4: 将同步过来的文件 信息添加到checksums.txt
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
if (sync)
hashing_out.sync();
}
}