PyTorch CUDACachingAllocator

介绍

通过阅读CUDACachingAllocator的源码,详细介绍CUDACachingAllocator的实现原理,以及如何使用CUDACachingAllocator来管理GPU内存。
CUDACachingAllocator是PyTorch中一个GPU显存缓存分配器,它可以缓存从GPU分配的内存,管理框架内部数据的分配释放,减少频繁的cudaMalloc和cudaFree开销。
CUDACachingAllocator的实现原理是通过一个内存池来管理GPU内存,提供分配释放接口,并且通过内存分配策略减少内存碎片,提高内存利用率。除了分配释放接口,CUDACachingAllocator还提供了一些其他接口,如内存池的使用情况等功能。

实现原理

内存池及内存分配策略

整体上分配器维持两个空闲内存池,一个大块内存池,一个小块内存池。大块内存池用于分配大于等于1MB的内存,小块内存池用于分配小于1MB的内存。大小块的内存分配策略都是最佳适应策略。

  • Bolck表示分配器中一块内存块,保存块大小,指针地址等大小。
struct Block {
  int device; // gpu
  cudaStream_t stream; // allocation stream
  stream_set stream_uses; // streams on which the block was used
  size_t size; // block size in bytes
  size_t requested_size; // memory originally requested
  BlockPool* pool{nullptr}; // owning memory pool
  void* ptr{nullptr}; // memory address
  bool allocated{false}; // in-use flag
  Block* prev{nullptr}; // prev block if split from a larger allocation
  Block* next{nullptr}; // next block if split from a larger allocation
  int event_count{0}; // number of outstanding CUDA events
  int gc_count{0}; // counter for prioritizing older / less useful blocks for
                   // garbage collection
  std::unique_ptr<HistoryChain> history;
  HistoryChain* history_last{nullptr};
};
  • BlockComparator是Block比较器,根据块大小比较,便于Block的排序
static bool BlockComparator(const Block* a, const Block* b) {
  if (a->stream != b->stream) {
    return (uintptr_t)a->stream < (uintptr_t)b->stream;
  }
  if (a->size != b->size) {
    return a->size < b->size;
  }
  return (uintptr_t)a->ptr < (uintptr_t)b->ptr;
}
  • BlockPool用set数据结构管理Block,根据Comparison排序,方便查找,插入,删除等操作。
struct BlockPool {
  BlockPool(
      Comparison comparator,
      bool small,
      PrivatePool* private_pool = nullptr)
      : blocks(comparator), is_small(small), owner_PrivatePool(private_pool) {}
  std::set<Block*, Comparison> blocks;
  const bool is_small;
  PrivatePool* owner_PrivatePool;
};

分配释放

DeviceCachingAllocator类维持一个大块内存池,一个小块内存池,和一个使用中的内存池

class DeviceCachingAllocator{
    // unallocated cached blocks larger than 1 MB
  BlockPool large_blocks;

  // unallocated cached blocks 1 MB or smaller
  BlockPool small_blocks;

  // allocated or in use by a stream. Holds all active allocations,
  // whether they came from graph_pools or one of the BlockPools above.
  ska::flat_hash_set<Block*> active_blocks;
};

malloc
流程
  1. 根据请求大小选择大块内存池或者小块的分配池
  2. 根据最佳分配策略从空闲的内存池中查找合适的块,如果找到从空闲内存池中删除这个块然后转到第6步,否则转到第3步
  3. 通过cudaMalloc分配一块内存,如果分配成功则转到第6步,否则转到第4步
  4. 在选中的内存池中,尽可能(释放的内存大于请求大小即停止)通过cudaFree释放一些内存,然后调用cudaMalloc分配一块内存,如果分配成功则转到第6步,否则转到第5步
  5. 通过cudaFree释放大小块空闲内存池,然后调用cudaMalloc分配一块内存,如果分配成功则转到第6步,否则抛出OOM异常
  6. 如果查找到的块能被切分,则切分块,将剩余的块插入到空闲内存池中。将分配的块放入使用中内存池中,然后返回分配的块。
Block* malloc(int device, size_t orig_size, cudaStream_t stream) {

    std::unique_lock<std::recursive_mutex> lock(mutex);

    size_t size = round_size(orig_size);
    auto& pool = get_pool(size, stream);
    const size_t alloc_size = get_allocation_size(size);
    AllocParams params(device, size, stream, &pool, alloc_size, stats);
    
    // First, try to get a block from the existing pool.
    bool block_found =
        // Search pool
        get_free_block(params)
        // Trigger callbacks and retry search
        || (trigger_free_memory_callbacks(params) && get_free_block(params));

    // Can't reuse an existing block; try to get a new one.
    if (!block_found) {
      // Do garbage collection if the flag is set.
      if (C10_UNLIKELY(
              set_fraction &&
              CachingAllocatorConfig::garbage_collection_threshold() > 0.0)) {
        garbage_collect_cached_blocks();
      }
      // Attempt allocate
      block_found = alloc_block(params, false)
          // Free enough available cached blocks to satisfy alloc and retry
          // alloc.
          || (release_available_cached_blocks(params) &&
              alloc_block(params, false))
          // Free all non-split cached blocks and retry alloc.
          || (C10_LIKELY(captures_underway == 0) && release_cached_blocks() &&
              alloc_block(params, true));
    }

    if (!block_found) {
        抛出oom异常
    }

    TORCH_INTERNAL_ASSERT(
        params.err == cudaSuccess && params.block != nullptr &&
        params.block->ptr != nullptr);
    Block* block = params.block;
    Block* remaining = nullptr;

    const bool already_split = block->is_split();
    if (should_split(block, size)) {
      remaining = block;
      block = new Block(device, stream, size, &pool, block->ptr);
      block->prev = remaining->prev;
      if (block->prev) {
        block->prev->next = block;
      }
      block->next = remaining;

      remaining->prev = block;
      remaining->ptr = static_cast<char*>(remaining->ptr) + size;
      remaining->size -= size;
      bool inserted = pool.blocks.insert(remaining).second;
      TORCH_INTERNAL_ASSERT_DEBUG_ONLY(inserted);
    }

    block->allocated = true;
    block->requested_size = orig_size;
    
    bool inserted = active_blocks.insert(block).second;
    TORCH_INTERNAL_ASSERT_DEBUG_ONLY(inserted);
    ....
    return block;
  }
free

free将block放入空闲的内存池,如果block的前后块都是空闲的,将block合并到前后块中,然后将合并后的块放入空闲内存池中。block前后块通过prev和next指针连接,block的状态通过allocated标志位表示。

  /** moves a block into a pool of cached free blocks */
  void free_block(Block* block) {
    TORCH_INTERNAL_ASSERT(
        !block->allocated && block->event_count == 0 &&
        block->stream_uses.empty());
    size_t original_block_size = block->size;
    size_t requested_size = block->requested_size;

    auto& pool = *block->pool;
    int64_t net_change_inactive_split_blocks = 0;
    int64_t net_change_inactive_split_size = 0;

    const std::array<Block*, 2> merge_candidates = {block->prev, block->next};
    for (Block* merge_candidate : merge_candidates) {
      const int64_t subsumed_size =
          try_merge_blocks(block, merge_candidate, pool);
      if (subsumed_size > 0) {
        net_change_inactive_split_blocks -= 1;
        net_change_inactive_split_size -= subsumed_size;
      }
    }
    active_blocks.erase(block);
    // Makes sure the Block* isn't already present in the pool we're freeing it
    // back into.
    bool inserted = pool.blocks.insert(block).second;
    TORCH_INTERNAL_ASSERT(inserted);

  }

内存池使用情况

描述内存池使用情况的结构体

内存池使用情况(如内存池当前使用大小,空闲大小,峰值使用量等)可以通过DeviceStats getDeviceStats(int device)获取,其中DeviceStats的定义如下:

// Struct containing memory allocator summary statistics for a device.
struct DeviceStats {
  // COUNT: allocations requested by client code
  StatArray allocation;
  // COUNT: number of allocated segments from cudaMalloc().
  StatArray segment;
  // COUNT: number of active memory blocks (allocated or used by stream)
  StatArray active;
  // COUNT: number of inactive, split memory blocks (unallocated but can't be
  // released via cudaFree)
  StatArray inactive_split;

  // SUM: bytes allocated by this memory alocator
  StatArray allocated_bytes;
  // SUM: bytes reserved by this memory allocator (both free and used)
  StatArray reserved_bytes;
  // SUM: bytes within active memory blocks
  StatArray active_bytes;
  // SUM: bytes within inactive, split memory blocks
  StatArray inactive_split_bytes;
  // SUM: bytes requested by client code
  StatArray requested_bytes;

  // COUNT: total number of failed calls to CUDA malloc necessitating cache
  // flushes.
  int64_t num_alloc_retries = 0;

  // COUNT: total number of OOMs (i.e. failed calls to CUDA after cache flush)
  int64_t num_ooms = 0;

  // COUNT: total number of oversize blocks allocated from pool
  Stat oversize_allocations;

  // COUNT: total number of oversize blocks requiring malloc
  Stat oversize_segments;

  // SIZE: maximum block size that is allowed to be split.
  int64_t max_split_size = 0;
};

StatArray是状态数组,定义如下

struct Stat {
  int64_t current = 0;
  int64_t peak = 0;
  int64_t allocated = 0;
  int64_t freed = 0;
};

enum struct StatType : uint64_t {
  AGGREGATE = 0,
  SMALL_POOL = 1,
  LARGE_POOL = 2,
  NUM_TYPES = 3 // remember to update this whenever a new stat type is added
};

typedef std::array<Stat, static_cast<size_t>(StatType::NUM_TYPES)> StatArray;
更新内存池使用状态

在内存池malloc,free等操作中,分配器都会更新内存池的状态,更新状态函数如下:

void update_stat(Stat& stat, int64_t amount) {
  stat.current += amount;

  TORCH_INTERNAL_ASSERT_DEBUG_ONLY(
      stat.current >= 0,
      "Negative tracked stat in CUDA allocator (likely logic error).");

  stat.peak = std::max(stat.current, stat.peak);
  if (amount > 0) {
    stat.allocated += amount;
  }
  if (amount < 0) {
    stat.freed += -amount;
  }
}

例如malloc会更新状态分配次数,分配的内存块大小,活跃的内存块大小,请求的内存大小

for_each_selected_stat_type(params.stat_types, [&](size_t stat_type) {
      update_stat(stats.allocation[stat_type], 1);
      update_stat(
          stats.allocated_bytes[stat_type],
          static_cast<std::int64_t>(block->size));
      update_stat(stats.active[stat_type], 1);
      update_stat(
          stats.active_bytes[stat_type],
          static_cast<std::int64_t>(block->size));
      update_stat(
          stats.requested_bytes[stat_type],
          static_cast<std::int64_t>(block->requested_size));
    });
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容