先说总结
ClickHouse数据库中有很多不同level的MemoryTracker,包括线程级别、查询级别、用户级别、server级别,这些MemoryTracker会通过parent指针组织成一个树形结构,把内存申请释放信息层层反馈上去。
MemoryTrack中还有额外的峰值信息(peak)统计,内存上限检查,一旦某个查询线程的申请内存请求在上层(查询级别、用户级别、server级别)MemoryTracker遇到超过限制错误,查询线程就会抛出OOM(Out Of Memory)异常导致查询退出。同时查询线程的MemoryTracker每申请一定量的内存都会统计出当前的工作栈,非常方便排查内存OOM(Out Of Memory)的原因。
1.ThreadGroup
ClickHouse的MPP计算引擎中每个查询的主线程都会有一个ThreadGroup对象,每个MPP引擎worker线程在启动时必须要attach到ThreadGroup上,在线程退出时detach,这保证了整个资源追踪链路的完整传递。
ThreadGroup对象的代码定义如下:
/** Thread group is a collection of threads dedicated to single task
* (query or other process like background merge).
*
* ProfileEvents (counters) from a thread are propagated to thread group.
*
* Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks).
* Use via CurrentThread::getGroup.
*/
class ThreadGroupStatus
{
public:
mutable std::mutex mutex;
ProfileEvents::Counters performance_counters{VariableContext::Process};
MemoryTracker memory_tracker{VariableContext::Process};
Context * query_context = nullptr;
Context * global_context = nullptr;
InternalTextLogsQueueWeakPtr logs_queue_ptr;
std::function<void()> fatal_error_callback;
std::vector<UInt64> thread_ids;
/// The first thread created this thread group
UInt64 master_thread_id = 0;
LogsLevel client_logs_level = LogsLevel::none;
String query;
UInt64 normalized_query_hash;
};
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
void PipelineExecutor::executeImpl(size_t num_threads)
{
OpenTelemetrySpanHolder span("PipelineExecutor::executeImpl()");
initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>;
ThreadsData threads;
threads.reserve(num_threads);
bool finished_flag = false;
SCOPE_EXIT(
if (!finished_flag)
{
finish();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
);
if (num_threads > 1)
{
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i)
{
// 把lambda表达式作为子线程的参数,由于lambda表达式可以方便的捕获作用域中的变量,故可以作为子线程的参数
// 线程池中的thread真正的执行体, emplace_back到threads中之后的调度由操作系统控制
// 注意GlobalThreadPool基于std::thread实现
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
{
/// ThreadStatus thread_status;
setThreadName("QueryPipelineEx");
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
executeSingleThread(thread_num, num_threads);
}
catch (...)
{
/// In case of exception from executor itself, stop other threads.
finish();
executor_contexts[thread_num]->exception = std::current_exception();
}
});
}
#if defined(OS_LINUX)
{
/// Wait for async tasks.
std::unique_lock lock(task_queue_mutex);
while (auto task = async_task_queue.wait(lock))
{
auto * node = static_cast<ExecutingGraph::Node *>(task.data);
executor_contexts[task.thread_num]->async_tasks.push(node);
executor_contexts[task.thread_num]->has_async_tasks = true;
++num_waiting_async_tasks;
if (threads_queue.has(task.thread_num))
{
threads_queue.pop(task.thread_num);
wakeUpExecutor(task.thread_num);
}
}
}
#endif
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
else
executeSingleThread(0, num_threads);
finished_flag = true;
}
可以看到这个关键的代码行,顾名思义,是一个thread汇聚到一个thread的集合里。
CurrentThread::attachTo(thread_group);
2.Attach to ThreadGroup
具体Attach过程:
void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
if (thread_state == ThreadState::AttachedToQuery)
{
if (check_detached)
throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
return;
}
if (!thread_group_)
throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);
setupState(thread_group_);
}
void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
{
assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
/// Attach or init current thread to thread group and copy useful information from it
thread_group = thread_group_;
performance_counters.setParent(&thread_group->performance_counters);
memory_tracker.setParent(&thread_group->memory_tracker);
{
std::lock_guard lock(thread_group->mutex);
/// NOTE: thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_ids.emplace_back(thread_id);
logs_queue_ptr = thread_group->logs_queue_ptr;
fatal_error_callback = thread_group->fatal_error_callback;
query_context = thread_group->query_context;
if (!global_context)
global_context = thread_group->global_context;
}
if (query_context)
{
applyQuerySettings();
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
}
else
{
thread_trace_context.trace_id = 0;
}
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
}
这里的两行代码表示了当前thread和threadGroups的一个关系。
从这里也不难猜出threadGroups应该是有一个总池子,thread有一个自己的池子。
这里将threadGroups的池子设置为所有thread线程的总池子。
那么,这说明,threadGroups的一些设置,对于单个thread的执行,是有约束作用的。
我们看看在哪些地方有用到了attchgroup。
3.重载new\delete
如何把CurrentThread::MemoryTracker hook到系统的内存申请、释放上去?ClickHouse首先是重载了c++的new_delete operator,其次针对需要使用malloc的一些场景封装了特殊的Allocator同步内存申请释放。
文件所在路径: src/Common/new_delete.cpp
/// new
void * operator new(std::size_t size)
{
//这里我们一会儿重点看一下
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new[](std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExcept(size)))
return Memory::newNoExept(size);
return nullptr;
}
/// delete
/// C++17 std 21.6.2.1 (11)
/// If a function without a size parameter is defined, the program should also define the corresponding function with a size parameter.
/// If a function with a size parameter is defined, the program shall also define the corresponding version without the size parameter.
/// cppreference:
/// It's unspecified whether size-aware or size-unaware version is called when deleting objects of
/// incomplete type and arrays of non-class and trivially-destructible class types.
void operator delete(void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
void operator delete[](void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
重点看上面代码段中标注的一行。
inline ALWAYS_INLINE void trackMemory(std::size_t size)
{
std::size_t actual_size = size;
#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 5
/// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
/// @note je_mallocx() != je_malloc(). It's expected they don't differ much in allocation logic.
if (likely(size != 0))
actual_size = nallocx(size, 0);
#endif
CurrentMemoryTracker::alloc(actual_size);
}
接下来看看alloc的实现,这个alloc并不是真正的alloc,是从限制中分配的alloc。
也就是说,真正的trick在这里,还记得刚才提到的池子么?
这个池子如果分配到了上限,那么,程序就提示报错了。
namespace CurrentMemoryTracker
{
using DB::current_thread;
void alloc(Int64 size)
{
if (auto * memory_tracker = getMemoryTracker())
{
if (current_thread)
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->alloc(size);
}
}
}
void MemoryTracker::alloc(Int64 size)
{
if (size < 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);
if (BlockerInThread::isBlocked(level))
{
/// Since the BlockerInThread should respect the level, we should go to the next parent.
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
return;
}
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::add(metric_loaded, size);
//这里有一个hard_limit,也就是内存的上限,如果超过这个上限,那么,要强制报错了。
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
/// Cap the limit to the total_memory_tracker, since it may include some drift
/// for user-level memory tracker.
///
/// And since total_memory_tracker is reset to the process resident
/// memory peridically (in AsynchronousMetrics::update()), any limit can be
/// capped to it, to avoid possible drift.
if (unlikely(current_hard_limit
&& will_be > current_hard_limit
&& level == VariableContext::User))
{
Int64 total_amount = total_memory_tracker.get();
if (amount > total_amount)
{
set(total_amount);
will_be = size + total_amount;
}
}
#ifdef MEMORY_TRACKER_DEBUG_CHECKS
if (unlikely(_memory_tracker_always_throw_logical_error_on_allocation))
{
_memory_tracker_always_throw_logical_error_on_allocation = false;
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Memory tracker: allocations not allowed.");
}
#endif
std::bernoulli_distribution fault(fault_probability);
if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true))
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
amount.fetch_sub(size, std::memory_order_relaxed);
throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
"Memory tracker{}{}: fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
description ? " " : "", description ? description : "",
formatReadableSizeWithBinarySuffix(will_be),
size, formatReadableSizeWithBinarySuffix(current_hard_limit));
}
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
}
std::bernoulli_distribution sample(sample_probability);
if (unlikely(sample_probability && sample(thread_local_rng)))
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
}
//触发内存超限条件
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false))
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
amount.fetch_sub(size, std::memory_order_relaxed);
throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
"Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
description ? " " : "", description ? description : "",
formatReadableSizeWithBinarySuffix(will_be),
size, formatReadableSizeWithBinarySuffix(current_hard_limit));
}
updatePeak(will_be);
//everything goes ok until here , normal way will be executing..
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}
为了解决内存追踪的性能问题,每个线程的内存申请释放会在thread local变量上进行积攒,最后以大块内存的形式同步给MemoryTracker。
/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
*/
class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
Int64 profiler_step = 0;
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;
/// To randomly sample allocations and deallocations in trace_log.
double sample_probability = 0;
/// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
/// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
std::atomic<MemoryTracker *> parent {};
/// You could specify custom metric to track memory usage.
CurrentMetrics::Metric metric = CurrentMetrics::end();
/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description_ptr = nullptr;
......
}
4. MemoryTrack 层级
上面所有论述讲述的是thread级别的memoryTracker设置。
下面看看User 级别的memoryTracker设置和Query级别的设置。
server参数设置为settings.max_memory_usage_for_user
我们来深入看一把ClickHouse打标user和query memoryTrack的逻辑。
首先,在执行Query的时候,ClickHouse server会对Query进行分析,交由ProcessList类对executeQuery时做了一部分检查工作。
包括一系列的阈值检查。比如是否超过某个user的最大并发执行数。
接下来ProcessList对象会设置memoryTracker。如下图所示。
标红的两段代码一个设置了single user级别的multiple queries的memory限制。一个设置了single query级别的memory限制。
执行顺序决定了memoryTrack的层级。
每一个线程执行,在runImp时,就已经生成了一个thread_status。
这个status后续会和clickhouse server从global线程池中分配的线程做一个绑定。这也是为什么文章开头的地方起名attach的缘故。
void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
{
if (unlikely(!current_thread))
return;
current_thread->attachQuery(thread_group, true);
current_thread->deleter = CurrentThread::defaultThreadDeleter;
}
void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
if (thread_state == ThreadState::AttachedToQuery)
{
if (check_detached)
throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
return;
}
if (!thread_group_)
throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);
setupState(thread_group_);
}
void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
{
assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
/// Attach or init current thread to thread group and copy useful information from it
thread_group = thread_group_;
performance_counters.setParent(&thread_group->performance_counters);
//注意,这里设置了thread的memoryTrack的parent是thread_group。
memory_tracker.setParent(&thread_group->memory_tracker);
{
std::lock_guard lock(thread_group->mutex);
/// NOTE: thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_ids.emplace_back(thread_id);
logs_queue_ptr = thread_group->logs_queue_ptr;
fatal_error_callback = thread_group->fatal_error_callback;
query_context = thread_group->query_context;
if (!global_context)
global_context = thread_group->global_context;
}
if (query_context)
{
applyQuerySettings();
// Generate new span for thread manually here, because we can't depend
// on OpenTelemetrySpanHolder due to link order issues.
thread_trace_context = query_context->query_trace_context;
if (thread_trace_context.trace_id)
{
thread_trace_context.span_id = thread_local_rng();
}
}
else
{
thread_trace_context.trace_id = 0;
}
initPerformanceCounters();
thread_state = ThreadState::AttachedToQuery;
}
从上述代码中可以看到thread在初始化之后,已经设置了thread的memoryTrack的parent是thread_group。
接下来,我们回到ProcessList看看执行完对象逻辑后,memoryTracker的层级变为什么。
如上图示,针对threadGroup设置层级为 for query -> for user -> total,刚才提到thread的parent已经是threadGroup。所以对于thread来说,memoryTracker的层级是:
for thread -> for query -> for user -> total
至此,我们基本理顺了memoryTracker的层级关系。
但是我们到现在为止并没有看到哪里有设置Setparent(totalMemoryTracker)的地方。
这里,不管后来的parent如何设置,新加入的节点的parent一定是total_memory_tracker。
clickhouse server 在启动的时候设置了 total_memory_tracker。
5.整体MemoryTracker流程
从上面的1、2、3、4点我们可以了解到。
对于一个Query来说。
有3个memory限制的关卡。分别对应参数为:
max_memory_usage, "Maximum memory usage for processing of single query. Zero means unlimited."
max_memory_usage_for_user, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited."
上面两个是对于user级别的限定。
<!-- Maximum memory usage (resident set size) for server process.
Zero value or unset means default. Default is "max_server_memory_usage_to_ram_ratio" of available physical RAM.
If the value is larger than "max_server_memory_usage_to_ram_ratio" of available physical RAM, it will be cut down.
The constraint is checked on query execution time.
If a query tries to allocate memory and the current memory usage plus allocation is greater
than specified threshold, exception will be thrown.
It is not practical to set this constraint to small values like just a few gigabytes,
because memory allocator will keep this amount of memory in caches and the server will deny service of queries.
-->
<max_server_memory_usage>0</max_server_memory_usage>
上面这一个,是总的内存限定。这个值并没有在setting.h中有默认值,这个如果在配置文件中不指定的话,就是0,不设上限。
所以这个设置应该是: max_memory_usage < max_memory_usage_for_user < max_server_memory_usage。
参考
云数据库ClickHouse资源隔离-弹性资源队列
https://developer.aliyun.com/article/780376?utm_content=g_1000223973