Saturday night,继续超短文模式。
在ClickHouse集群中,我们可以在DDL语句上附加ON CLUSTER <cluster_name>
的语法,使得该DDL语句执行一次即可在集群中所有实例上都执行,简单方便。每执行一条分布式DDL,会在配置文件中<distributed_ddl><path>
指定的ZooKeeper路径上写一条执行记录(路径默认为/clickhouse/task_queue/ddl
)。如下图所示。
但是,这个队列默认似乎不会自动清理,造成znode不断增长,官方文档中也没有提供对应的参数来控制。考虑到手动删除znode可能会有风险,遂去ClickHouse源码中寻找蛛丝马迹,最终在dbms/src/interpreters/DDLWorker.h里找到如下定义:
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
- cleanup_delay_period:检查DDL记录清理的间隔,单位为秒,默认60秒。
- task_max_lifetime:分布式DDL记录可以保留的最大时长,单位为秒,默认保留7天。
- max_tasks_in_queue:分布式DDL队列中可以保留的最大记录数,默认为1000条。
将以上参数加入config.xml的<distributed_ddl>
一节即可。
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<cleanup_delay_period>60</cleanup_delay_period>
<task_max_lifetime>86400</task_max_lifetime>
<max_tasks_in_queue>200</max_tasks_in_queue>
</distributed_ddl>
ClickHouse内部有专门的线程来清理DDL队列,具体逻辑位于DDLWorker.cpp中,不难,代码录如下。
void DDLWorker::runCleanupThread()
{
setThreadName("DDLWorkerClnr");
LOG_DEBUG(log, "Started DDLWorker cleanup thread");
Int64 last_cleanup_time_seconds = 0;
while (!stop_flag)
{
try
{
cleanup_event->wait();
if (stop_flag)
break;
Int64 current_time_seconds = Poco::Timestamp().epochTime();
if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
{
LOG_TRACE(log, "Too early to clean queue, will do it later.");
continue;
}
auto zookeeper = tryGetZooKeeper();
if (zookeeper->expired())
continue;
cleanupQueue(current_time_seconds, zookeeper);
last_cleanup_time_seconds = current_time_seconds;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
}
void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper)
{
LOG_DEBUG(log, "Cleaning queue");
Strings queue_nodes = zookeeper->getChildren(queue_dir);
filterAndSortQueueNodes(queue_nodes);
size_t num_outdated_nodes = (queue_nodes.size() > max_tasks_in_queue) ? queue_nodes.size() - max_tasks_in_queue : 0;
auto first_non_outdated_node = queue_nodes.begin() + num_outdated_nodes;
for (auto it = queue_nodes.cbegin(); it < queue_nodes.cend(); ++it)
{
if (stop_flag)
return;
String node_name = *it;
String node_path = queue_dir + "/" + node_name;
String lock_path = node_path + "/lock";
Coordination::Stat stat;
String dummy;
try
{
/// Already deleted
if (!zookeeper->exists(node_path, &stat))
continue;
/// Delete node if its lifetime is expired (according to task_max_lifetime parameter)
constexpr UInt64 zookeeper_time_resolution = 1000;
Int64 zookeeper_time_seconds = stat.ctime / zookeeper_time_resolution;
bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds;
/// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
bool node_is_outside_max_window = it < first_non_outdated_node;
if (!node_lifetime_is_expired && !node_is_outside_max_window)
continue;
/// Skip if there are active nodes (it is weak guard)
if (zookeeper->exists(node_path + "/active", &stat) && stat.numChildren > 0)
{
LOG_INFO(log, "Task " << node_name << " should be deleted, but there are active workers. Skipping it.");
continue;
}
/// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
/// But the lock will be required to implement system.distributed_ddl_queue table
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
if (!lock->tryLock())
{
LOG_INFO(log, "Task " << node_name << " should be deleted, but it is locked. Skipping it.");
continue;
}
if (node_lifetime_is_expired)
LOG_INFO(log, "Lifetime of task " << node_name << " is expired, deleting it");
else if (node_is_outside_max_window)
LOG_INFO(log, "Task " << node_name << " is outdated, deleting it");
/// Deleting
{
Strings childs = zookeeper->getChildren(node_path);
for (const String & child : childs)
{
if (child != "lock")
zookeeper->tryRemoveRecursive(node_path + "/" + child);
}
/// Remove the lock node and its parent atomically
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
zookeeper->multi(ops);
lock->unlockAssumeLockNodeRemovedManually();
}
}
catch (...)
{
LOG_INFO(log, "An error occured while checking and cleaning task " + node_name + " from queue: " + getCurrentExceptionMessage(false));
}
}
}
民那晚安晚安。