clickhouse写入和副本同步过程

一: 概述 Clickhouse 插入数据

Clickhouse 插入数据过程

当需要在ReplicatedMergeTree中执行INSERT以写入数据时,即会进入INSERT核心流程,整个流程从上至下按照时间顺序进行。

NEXT-1: 创建副本

副本1:

create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic1')
partition by toYYYYMM(ctime)
order by id;

副本2:

create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic2')
partition by toYYYYMM(ctime)
order by id;

在创建的过程中,ReplicatedMergeTree会进行一些初始化操作。

  • 根据zk_path初始化所有的ZooKeeper节点。

    表的zk_path:
    ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated 
    
    [alter_partition_version, block_numbers, blocks, columns, leader_election, log, metadata, mutations, nonincrement_block_numbers, part_moves_shard, pinned_part_uuids, quorum, replicas, temp, zero_copy_hdfs, zero_copy_s3]
    
  • 在/replicas/节点下注册自己的副本实例replic1或replic2

replic1的zk path:
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1

[columns, flags, host, is_active, is_lost, log_pointer, max_processed_insert_time, metadata, metadata_version, min_unprocessed_insert_time, mutation_pointer, parts, queue]
  • 启动监听任务,监听/log日志节点。
[zk: 10.178.19.198:2181(CONNECTED) 24] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log 
[log-0000000000]

NEXT-2: 向第一个副本实例写入数据

1) 术语

Block数据块:insert依据max_insert_block_size的大小(默认1048576行)将数据切分成若干个Block数据块。因此Block数据块是数据写入的基本单元,并且具有写入的原子性和唯一性。

每个压缩数据块的体积,按照其压缩前的数据字节大小,被严格控制在64KB~1MB之间,上下限大小由min_compress_block_size(默认65536)max_compress_block_size(默认1048576)参数指定。而每一个压缩数据块最终大小,则和一个index_granularity内实际的数据大小有关。

  • 单个索引粒度间隔数据size < 64KB:如果单个索引粒度数据大小小于64KB,则继续获取下一个索引粒度的数据,一直到size >= 64KB,生成下一个压缩数据块。

  • 单个索引粒度间隔数据 64KB <= size <= 1MB:如果单个索引粒度数据大小大于64KB,小于1MB,则直接生成下一个压缩数据块

  • 单个索引粒度间隔数据 size > 1MB:如果单个索引粒度数据大小超过1MB,则先按照1MB大小截断并生成下一个压缩数据块,剩余数据按照这三个规则对应执行。这时就会出现一批数据生成多个压缩数据块的情况。


    clickhouse数据块.jpg

2) 插入数据

replic1写入如下数据:

insert into test_replicated values(1,now(),'zs'),(2,now(),'ls');                       

插入数据后如下zk会有如下数据:

1: /blocks
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks 
[202201_5253379567567382390_63093905895384417]

get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks/202201_5253379567567382390_63093905895384417 
202201_0_0_0


2: log
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000]

get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000000 

format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact

NEXT-3: 第二个副本实例拉取Log日志

replic2副本会一直监听/log节点变化,当replic1推送log/log-0000000000后,replic2会触发日志的拉取任务并更新log_pointer

get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer 
1

replic2 在拉取了LogEntry之后,它并不会直接执行,而是将其转为任务对象放至队列:

ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/queue 
queue-0000000001

NEXT-4: 第二个副本实例向其他副本发起下载请求

replic2基于/queue队列开始执行任务。当看到type类型为get的时候,ReplicatedMerge-Tree会明白此时在远端的其他副本中已经成功写入了数据分区,而自己需要同步这些数据。

replic2上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:
(1)从/replicas节点拿到所有的副本节点。
(2)遍历这些副本,选取其中一个。选取的副本需要拥有最大的log_pointer下标,并且/queue子节点数量最少。log_pointer下标最大,意味着该副本执行的日志最多,数据应该更加完整;而/queue最小,则意味着该副本目前的任务执行负担较小。

例如:
Sending request to http://replic1.clickhouse-headless.cluster.local:9009/?endpoint=DataPartsExchange……

NEXT-5 第一个副本实例响应数据下载

replic1的DataPartsExchange端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区202107_0_0_0基于DataPartsExchang的服务响应发送回replic1

NEXT-6 第二个副本实例下载数据并完成本地写入

首先将其写至临时目录

```
tmp_fetch_202107_1_1_0
```

待全部数据接收完成之后,重命名该目录

```
Renaming temporary part tmp_fetch_202107_1_1_0 to 202107_1_1_0.
```

二: Clickhouse副本同步过程

2.1 哪些操作会引发数据同步?

数据同步包括了元数据和内容数据的同步。在ClickHouse中,数据插入(INSERT),元数据变更(ALTER),内容数据变更(MUTATION),合并(MERGE)等操作都会引发数据同步。

其中MERGEMUTATION操作 只能由主副本主导, 主副本选举过程如下:

/leader_election**:用于主副本的选举工作,主副本会主导`MERGE`和`MUTATION`操作(`ALTER DELETE`和`ALTER UPDATE`)。这些任务在主副本完成之后再借助ZooKeeper将消息事件分发至其他副本

主副本选举的方式是向/leader_election/插入子节点,第一个插入成功的副本 就是主副本

2.2: INSERT同步

clickhouse执行ReplicatedMergeTree表的插入操作时,会将数据以data part为单位写入到磁盘/内存里,每个data part写入完成后都会做一个commit操作, 这个commit操作会生成一个GET_PART类型的log entry并上传到zookeeper中供其他副本拉取同步。

由此可见,INSERT语句引发的是data part级别的内容数据的同步。

GET_PART类型的log entry 文件内容demo:

format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact

2.3 同步日志和副本队列(log, queue)

2.3.1 同步日志(log)

log位于zookeeper_path下面,用于保存所有副本间的数据同步日志(称为log entry),ClickHouse支持的LogEntry类型有:

img

笔记:

每插入一个数据  都会生成一个entity log:
[zk: localhost:2181(CONNECTED) 20] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000, log-0000000001]

原理: 在一个副本上执行insert into ...操作会上传一个GET_PART类型的log entry到log节点下。
zookeeper_path/log下的log entry在zookeeper上的znode命名规范是log-seqNum, 其中seqNum为创建sequential znode生成的自增序号,如log-0000000001。

log entity 下有什么东西:

[zk: localhost:2181(CONNECTED) 22] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000001

format version: 4
create_time: 2022-01-07 22:08:20
source replica: replic1  //数据源
block_id: 202201_4569856920166365182_7768759302129368290  // 分区的block_id
get
202201_1_1_0  //从那个part下载
part_type: Compact

2.3.2 副本队列 (queue)

queue位于replica_path下面,保存了当前副本需要同步的log entries. queue里面的log entries是从log中拉取的(由queue updating task 任务负责拉取),被所有副本都拉取过的log entry就可以从log中删除掉, 删除操作由后台清理线程执行

replica_path/queue下的entry在zookeeper上的znode命名规范是queue-seqNum, 其中seqNum为创建sequential znode生成的自增序号,如queue-0000000001。

replica_path下还有一个log_pointer节点,保存了当前副本从log拉取到的最大log entry id + 1,即下一个需要拉取的log entry的id.

[zk:localhost:2181(CONNECTED) 11] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer   //副本2 
2
[zk:localhost:2181(CONNECTED) 12] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/log_pointer    // 副本1
2

3 负责数据同步的tasks

每个副本节点都有一系列的负责数据同步的任务,这些任务会与zookeeper交互,获取最新的同步任务,然后在本地内存中维护任务队列并借助任务池调度执行,执行完成后会更新zookeer上的对应状态。下面我们逐一介绍各类同步任务。

3.1 queue updating task

queue updating task负责当前副本上同步日志(log entry)队列的更新。

ReplicatedMergeTree表对应的StoreageReplicatedMergeTree类中有一个queue_updating_task对象,它是由后台调度池(BackgroundSchedulePool)调度执行的任务,其执行的函数是

StorageReplicatedMergeTree::queueUpdatingTask()

queue updating task做的事情包括:
1. 从zookeeper拉取zookeeper_path/log下的所有log entries,并从replica_path/log_pointer获取当前副本的log_pointer (即下一个需要拉取的log entry的id)。

2. 根据log_pointer在log entries中定位到需要拉取的entries,如果log_pointer为空,则从log entries中最小的entry开始拉取。

3. 将需要拉取的log entries都同步到replica_path/queue下面,这里的同步就是以选定的log entries的内容为节点内容,一个一个地在replica_path/queue下创建新的sequential znode. 注意,这里创建的znode名称并不会和zookeeper_path/log下的一致。

4. 更新replica_path/log_pointer的内容为拉取到的log entries中最大的entry id + 1.

5. 将拉取到的log entries插入到内存中的同步任务队列里,即插入到ReplicatedMergeTreeQueue::queue。

6. 触发queue executing task(下文会介绍)执行。

这些步骤的实现细节请见源码ReplicatedMergeTreeQueue::pullLogsToQueue.

void StorageReplicatedMergeTree::queueUpdatingTask()
{
    ....
    try
    {
        queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
        .....
    }
    catch (const Coordination::Exception & e)
    {
       ....
    }
    ....
}

3.2 ReplicatedMergeTreeQueue::queue 队列任务

StorageReplicatedMergeTree::queueUpdatingTask() 会将拉取到的log entries插入到内存中的同步任务队列里.

3.3 queue executing task

queue updating task把同步日志(log entries)拉取到本地内存后,我们需要一个任务去执行这些log entries. 这个任务就是queue executing task.

queue executing 最终调用执行的函数是 StorageReplicatedMergeTree::queueTask

下面介绍queueTask函数的主要执行步骤。

step1: 选择要处理的log entry

对于GET_PART类型的log entry,如果它生成的data part被某个正在执行的log entry的resulting data parts所包含,则当前entry此轮不被选择。

step2: 处理选中的log entry

选中log entry后,StorageReplicatedMergeTree::queueTask函数会调用ReplicatedMergeTreeQueue::processEntry对log entry进行处理。processEntry函数的第三个参数是用于执行log entry的函数,此处传入的是StorageReplicatedMergeTree::executeLogEntry

源码:

BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
    ....
    // 对正在执行的entry用selected变量来表示,方便以后处理
    ReplicatedMergeTreeQueue::SelectedEntry selected;
    try
    {
        // 1: 选择需要处理的entrys
        selected = queue.selectEntryToProcess(merger_mutator, *this);
    }
    ....
    LogEntryPtr & entry = selected.first;
    // 2: 如果没有需要处理的直接返回
    if (!entry)
        return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
    // 3: 有需要处理的执行以下代码
    time_t prev_attempt_time = entry->last_attempt_time;

    // 这里是真正的处理方法,并且传入了一个匿名方法
    bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
    {
        try
        {
            // 4: 这里是真正的执行器executeLogEntry
            return executeLogEntry(*entry_to_process);
        }
        ....
    });
    ....
}

3.4 StorageReplicatedMergeTree::executeLogEntry

每次queue executing task执行只会选择一个log entry进行处理。选中log entry后,StorageReplicatedMergeTree::queueTask函数会最终会调用StorageReplicatedMergeTree::executeLogEntry进行处理

每个log entiry的处理逻辑不一样, 本节对于GET_PART类型的log entry,处理流程如下:

step1: 判断目标data part在当前副本是否已经存在或被已存在的data part所包含,这里不仅会检查active data parts,也会检查处于MergeTreeDataPartState::PreCommitted状态的data parts。

step2: 判断目标data part在zookeeper的replica_path/parts节点下是否已经存在。

step3: 如果step1和step2的判断结果都是已存在,则直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功)。

step4: 否则,进一步判断目标data part是否是某次失败的write with quorum操作的resulting data part,判断方法是查看zookeeper上zookeeper_path/quorum/failed_parts节点下是否存在目标data part。如果存在,则和2.3一样,直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功)。

step5: 否则,从其他有目标data part的副本节点去拉取目标data part。拉取是调用StorageReplicatedMergeTree::executeFetch实现的。

以上过程对应源码如下:

bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
....
if (entry.type == LogEntry::GET_PART ||
        entry.type == LogEntry::MERGE_PARTS ||
        entry.type == LogEntry::MUTATE_PART)
    {
        //1:判断目标data part在当前副本是否已经存在或被已存在的data part所包含,这里不仅会检查active data parts,也会检查处于MergeTreeDataPartState::PreCommitted状态的data parts
        DataPartPtr existing_part = getPartIfExists(entry.new_part_name, {MergeTreeDataPartState::PreCommitted});
        if (!existing_part)
       // 2: getActiveContainingPart 判断分区是否已经 merge 到磁盘
            existing_part = getActiveContainingPart(entry.new_part_name);
        /**
        3: 本地存在的话,判断目标data part在zookeeper的*replica_path/parts*节点下是否已经存在。
        例如:
  [zk: localhost:2181(CONNECTED) 59] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/parts 
[202201_0_0_0, 202201_1_1_0]
        *
        **/
        if (existing_part && getZooKeeper()->exists(replica_path + "/parts/" + existing_part->name))
        {
            ....
            // 4: 如果 part在当前副本是否已经存在或zk已经存在,则直接跳过当前log entry,不做处理(此时executeLogEntry返回true,表示处理成功
            return true;
        }
        
    bool do_fetch = false;

    switch (entry.type)
    {
        case LogEntry::ATTACH_PART:
            /// We surely don't have this part locally as we've checked it before, so download it.
            [[fallthrough]];
        case LogEntry::GET_PART:
            do_fetch = true;
            break;
        .....
        default:
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected log entry type: {}", static_cast<int>(entry.type));
    }
    // 5: 对于get类型part, 则直接调用 fetch  
    if (do_fetch)
        // 实际调用的是executeFetch 
        return executeFetch (entry);
    return true;

3.4 StorageReplicatedMergeTree::executeFetch

executeFetch方法是处理拉取part的拉取任务

源码如下:

bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
    /// 1: 查找是否有需要覆盖的part
    String replica = findReplicaHavingCoveringPart(entry, true);
    const auto storage_settings_ptr = getSettings();

    // 2: 设置一些并行参数,判断replicated_max_parallel_fetches和
    // replicated_max_parallel_fetches_for_table是否符合要求
    static std::atomic_uint total_fetches {0};
    // 判断是否超过拉取次数限制(storage级别)
    if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
    {
        throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
            ErrorCodes::TOO_MANY_FETCHES);
    }

    ++total_fetches;
    SCOPE_EXIT({--total_fetches;});
    // 判断是否超过拉取次数限制(table级别)
    if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
    {
        throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
            ErrorCodes::TOO_MANY_FETCHES);
    }

    ++current_table_fetches;
    SCOPE_EXIT({--current_table_fetches;});
   .....
        try
        {
            String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
            // 3: 拉取part
            if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
                return false;
        }
        .....
    }
    ...
    return true;
}

Next1-StorageReplicatedMergeTree::fetchPart 方法

executeFetch 最终拉取part 调用的是fetchPart 方法, fetchPart 方法实现如下:

bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
{
    auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
    const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
    ...
    LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
    /***
    2022.01.05 16:29:06.940980 [ 4423 ] {} <Debug> hdp_teu_dpd_clickhousedb.hybrid_test_tongbu (4058c757-c516-4345-8058-c757c5166345): Fetching part 20220105_1_1_0 from /clickhouse/common_test_cluster/hdp_teu_dpd_clickhousedb/hybrid_test_tongbu/shard2/replicas/replic2
    **/
    ...
    std::function<MutableDataPartPtr()> get_part;
    if (part_to_clone)
    {
        .....
    }
    else
    {
        // 1: 获取需要clone数据的副本地址
        ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
        auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
        auto user_password = global_context.getInterserverCredentials();
        String interserver_scheme = global_context.getInterserverScheme();

        get_part = [&, address, timeouts, user_password, interserver_scheme]()
        {
            ...
            // 2: 这里的fetchPart主要就是构造HTTP参数及连接真正拉取数据
            /**
            参数 demo:  
            source_replica_path: zookeeper_path + "/replicas/" + replica
            address.host:hostname1
            port: 9009 
            interserver_scheme: 
            to_detached: false
            part_name : 202107_1_1_0
            **/
            // 3: 最终调用的是 fetcher.fetchPart 方法
            return fetcher.fetchPart(
                part_name, source_replica_path,
                address.host, address.replication_port,
                timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
        };
    }
    ...
    return true;
}

Next2-fetcher.fetchPart 代码如下(DataPartsExchange.cpp):

MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
    const StorageMetadataPtr & metadata_snapshot,
    ContextPtr context,
    const String & part_name,
    const String & replica_path,
    const String & host,
    int port,
    const ConnectionTimeouts & timeouts,
    const String & user,
    const String & password,
    const String & interserver_scheme,
    ThrottlerPtr throttler,
    bool to_detached,
    const String & tmp_prefix_,
    std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
    bool try_zero_copy,
    DiskPtr disk)
{

    auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
    const auto data_settings = data.getSettings();
    // 1: 设置 uri
    Poco::URI uri;
    uri.setScheme(interserver_scheme);
    uri.setHost(host);
    uri.setPort(port);
    uri.setQueryParameters(
    {
        {"endpoint",                getEndpointId(replica_path)},
        {"part",                    part_name},
        {"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
        {"compress",                "false"}
    });

    Strings capability;
    if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
    {
       /*******
       2: 判断是如果是zero_copy 添加:
       capability.push_back(DiskType::toString(disk->getType()));
       细节略
       ***/
        
    }
    if (!capability.empty())
    {
        const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
        uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
    }
    else
    {
        try_zero_copy = false;
    }

    Poco::Net::HTTPBasicCredentials creds{};
    if (!user.empty())
    {
        creds.setUsername(user);
        creds.setPassword(password);
    }

    PooledReadWriteBufferFromHTTP in{
        uri,
        Poco::Net::HTTPRequest::HTTP_POST,
        {},
        timeouts,
        creds,
        DBMS_DEFAULT_BUFFER_SIZE,
        0, /* no redirects */
        data_settings->replicated_max_parallel_fetches_for_host
    };
   
    int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));

    ReservationPtr reservation;
    size_t sum_files_size = 0;
    // 3: 获取服务端part目下文件的累计大小
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
    {
        readBinary(sum_files_size, in);
        ....
    }
    .....

    bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
                    && sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);

    String part_type = "Wide";
    // 4: 读取server端part的part_type
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
        readStringBinary(part_type, in);
    // 5: 读取server端part的part_uuid
    UUID part_uuid = UUIDHelpers::Nil;
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
        readUUIDText(part_uuid, in);

    String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
    if (!remote_fs_metadata.empty())
    {
       // 6: 如果是远程服务端走元数据同步逻辑,如s3或者 hdfs (相当于握手协议,双方都是zero-copy存储类型才可以走元数据同步逻辑)
        try
        {   
            return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
        }
        ....
     }
    auto storage_id = data.getStorageID();
    // 7: 构建副本的part存放路径
     String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
    
    auto entry = data.getContext()->getReplicatedFetchList().insert(
        storage_id.getDatabaseName(), storage_id.getTableName(),
        part_info.partition_id, part_name, new_part_path,
        replica_path, uri, to_detached, sum_files_size);

    in.setNextCallback(ReplicatedFetchReadCallback(*entry));

    size_t projections = 0;
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
        readBinary(projections, in);

    MergeTreeData::DataPart::Checksums checksums;
    8: 构建副本的part_type 触发不同的下载逻辑,一般是downloadPartToDisk
    return part_type == "InMemory"
        ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
        : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
}

Next3-downloadPartToDisk:(写入本地磁盘的方法)

MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
    const String & part_name,
    const String & replica_path,
    bool to_detached,
    const String & tmp_prefix_,
    bool sync,
    DiskPtr disk,
    PooledReadWriteBufferFromHTTP & in,
    size_t projections,
    MergeTreeData::DataPart::Checksums & checksums,
    ThrottlerPtr throttler)
{
   // 1: 定义临时目录前缀
    static const String TMP_PREFIX = "tmp-fetch_";
    String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
    .........
    String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
    // 2: 构建下载后的存储路径 注意是写到临时目录
    String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
    // 3: 如果本地磁盘下存在该路径,将以前的临时目录删除
    if (disk->exists(part_download_path))
    {
        LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
            fullPath(disk, part_download_path));
        disk->removeRecursive(part_download_path);
    }
    // 4: 创建临时目录: 例如:/data00/clickhouse/store/85f/85f61e82-f5cc-429b-85f6-1e82f5cc229b/temp_20220103_0_5_1
    disk->createDirectories(part_download_path);
    .....
    // 5: 开始下载 Download the base part
    downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
    ....
}

// Next4-downloadBaseOrProjectionPartToDisk

void Fetcher::downloadBaseOrProjectionPartToDisk(
    const String & replica_path,
    const String & part_download_path,
    bool sync,
    DiskPtr disk,
    PooledReadWriteBufferFromHTTP & in,
    MergeTreeData::DataPart::Checksums & checksums,
    ThrottlerPtr throttler) const
{
    size_t files;
    readBinary(files, in);
    //1: 遍历服务端part目录下的文件
    for (size_t i = 0; i < files; ++i)
    {
        String file_name;
        UInt64 file_size;

        readStringBinary(file_name, in);
        readBinary(file_size, in);
        .....
        //2: 创建out 目标
        auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
        HashingWriteBuffer hashing_out(*file_out);
        //3: copy 写入
        copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
        .....
        // 4: 将同步过来的文件 信息添加到checksums.txt
        if (file_name != "checksums.txt" &&
            file_name != "columns.txt" &&
            file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
            checksums.addFile(file_name, file_size, expected_hash);
        if (sync)
            hashing_out.sync();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容