这段时间基本每天早上花一个钟头来看些书,正好看到stl源码剖析的前几章,其中关于SGI内存分配的介绍,具体分析可以参考那本书籍。之前知道的有些框架,比如tmalloc的内存分配方式,leveldb会预先分配一大块内存,然后再进行分割,主要是解决一些碎片及性能问题,对于单线程情况,直接malloc/new一块大的在本地,然后慢慢使用或者按照一定的策略分割成各大小的可用空闲块,并插入到freelist,中间就不用再malloc这种相对复杂的工作,当然另一方面可能造成浪费,malloc的实现相对复杂,在一些对性能要求极高的情况下会差些,一般情况下足够用,具体可以参考malloc原理和内存碎片:
当一个进程发生缺页中断的时候,进程会陷入内核态,执行以下操作:
1、检查要访问的虚拟地址是否合法
2、查找/分配一个物理页
3、填充物理页内容(读取磁盘,或者直接置0,或者啥也不干)
4、建立映射关系(虚拟地址到物理地址)
重新执行发生缺页中断的那条指令
如果第3步,需要读取磁盘,那么这次缺页中断就是majflt,否则就是minflt。
因为在一些多线程程序中使用内存是直接malloc/free的,所以里面肯定是有锁来同步。这里准备先分析下dpdk中的ret_malloc
相关实现,早些工作的时候分析过一些。
63 /**
64 * This function allocates memory from the huge-page area of memory. The memory
65 * is not cleared. In NUMA systems, the memory allocated resides on the same
66 * NUMA socket as the core that calls this function.
83 */
68 void *
69 rte_malloc_socket(const char *type, size_t size, unsigned align, int socket_arg)
70 {
71 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
72 int socket, i;
73 void *ret;
74
75 /* return NULL if size is 0 or alignment is not power-of-2 */
76 if (size == 0 || (align && !rte_is_power_of_2(align)))
77 return NULL;
78
79 if (!rte_eal_has_hugepages())
80 socket_arg = SOCKET_ID_ANY;
81
82 if (socket_arg == SOCKET_ID_ANY)
83 socket = malloc_get_numa_socket();
84 else
85 socket = socket_arg;
86
87 /* Check socket parameter */
88 if (socket >= RTE_MAX_NUMA_NODES)
89 return NULL;
90
91 ret = malloc_heap_alloc(&mcfg->malloc_heaps[socket], type,
92 size, 0, align == 0 ? 1 : align, 0);
93 if (ret != NULL || socket_arg != SOCKET_ID_ANY)
94 return ret;
96 /* try other heaps */
97 for (i = 0; i < RTE_MAX_NUMA_NODES; i++) {
98 /* we already tried this one */
99 if (i == socket)
100 continue;
101
102 ret = malloc_heap_alloc(&mcfg->malloc_heaps[i], type,
103 size, 0, align == 0 ? 1 : align, 0);
104 if (ret != NULL)
105 return ret;
106 }
108 return NULL;
109 }
52 struct rte_mem_config {
77 //more code..
78 /* Heaps of Malloc per socket */
79 struct malloc_heap malloc_heaps[RTE_MAX_NUMA_NODES];
85 } __attribute__((__packed__));
48 struct malloc_heap {
49 rte_spinlock_t lock;
50 LIST_HEAD(, malloc_elem) free_head[RTE_HEAP_NUM_FREELISTS];
51 unsigned alloc_count;
52 size_t total_size;
53 } __rte_cache_aligned;
如上面的注释说明,一般在numa架构中,core使用本地内存会更快一些。判断是否可用socket_node并就近分配,如果没有足够的内存则跨socket_node继续查找分配:
148 /*
149 * Main function to allocate a block of memory from the heap.
150 * It locks the free list, scans it, and adds a new memseg if the
151 * scan fails. Once the new memseg is added, it re-scans and should return
152 * the new element after releasing the lock.
153 */
154 void *
155 malloc_heap_alloc(struct malloc_heap *heap,
156 const char *type __attribute__((unused)), size_t size, unsigned flags,
157 size_t align, size_t bound)
158 {
159 struct malloc_elem *elem;
160
161 size = RTE_CACHE_LINE_ROUNDUP(size);
162 align = RTE_CACHE_LINE_ROUNDUP(align);
163
164 rte_spinlock_lock(&heap->lock);
165
166 elem = find_suitable_element(heap, size, flags, align, bound);
167 if (elem != NULL) {
168 elem = malloc_elem_alloc(elem, size, align, bound);
169 /* increase heap's count of allocated elements */
170 heap->alloc_count++;
171 }
172 rte_spinlock_unlock(&heap->lock);
173
174 return elem == NULL ? NULL : (void *)(&elem[1]);
175 }
以上把内存大小调整到cacheline倍数,并对相应的heap加自旋锁,这里进行了两步,先查找,再分配:
122 static struct malloc_elem *
123 find_suitable_element(struct malloc_heap *heap, size_t size,
124 unsigned flags, size_t align, size_t bound)
125 {
126 size_t idx;
127 struct malloc_elem *elem, *alt_elem = NULL;
128
129 for (idx = malloc_elem_free_list_index(size);
130 idx < RTE_HEAP_NUM_FREELISTS; idx++) {
131 for (elem = LIST_FIRST(&heap->free_head[idx]);
132 !!elem; elem = LIST_NEXT(elem, free_list)) {
133 if (malloc_elem_can_hold(elem, size, align, bound)) {
134 if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
135 return elem;
136 if (alt_elem == NULL)
137 alt_elem = elem;
138 }
139 }
140 }
141
142 if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
143 return alt_elem;
144
145 return NULL;
146 }
从上面的查找,其实可以发现这里面的实现是怎么样的了,如stl sgi中的二次分配器,从最靠近的size索引处查找,只不过那边是8,算出内存大小size所属哪个freelist_index:
154 size_t
155 malloc_elem_free_list_index(size_t size)
156 {
157 #define MALLOC_MINSIZE_LOG2 8
158 #define MALLOC_LOG2_INCREMENT 2
159
160 size_t log2;
161 size_t index;
162
163 if (size <= (1UL << MALLOC_MINSIZE_LOG2))
164 return 0;
165
166 /* Find next power of 2 >= size. */
167 log2 = sizeof(size) * 8 - __builtin_clzl(size-1);//__builtin_clzl作用是返回左起第一个‘1’之前0的个数。
168
169 /* Compute freelist index, based on log2(size). */
170 index = (log2 - MALLOC_MINSIZE_LOG2 + MALLOC_LOG2_INCREMENT - 1) /
171 MALLOC_LOG2_INCREMENT;
172
173 return index <= RTE_HEAP_NUM_FREELISTS-1?
174 index: RTE_HEAP_NUM_FREELISTS-1;
175 }
比如size<=2^8 的时候,index=0;2^8 <size<=2^10 的时候为1;这里是要算的。因为size经过cacheline对齐,一定是64的倍数,也是2的某几次方,那么log2算出后的是size<=2^(log2),如注释举例:
148 * heap->free_head[0] - (0 , 2^8]
149 * heap->free_head[1] - (2^8 , 2^10]
150 * heap->free_head[2] - (2^10 ,2^12]
151 * heap->free_head[3] - (2^12, 2^14]
152 * heap->free_head[4] - (2^14, MAX_SIZE]
回到最初,一直使用的struct rte_mem_config *mem_config
是如何分配的呢?调用顺序如下:rte_eal_init
->rte_config_init
->rte_eal_config_create
:
//create memory configuration in shared/mmap memory
205 rte_mem_cfg_addr = mmap(rte_mem_cfg_addr, sizeof(*rte_config.mem_config),
206 PROT_READ | PROT_WRITE, MAP_SHARED, mem_cfg_fd, 0);
207
208 if (rte_mem_cfg_addr == MAP_FAILED){
209 rte_panic("Cannot mmap memory for rte_config\n");
210 }
211 memcpy(rte_mem_cfg_addr, &early_mem_config, sizeof(early_mem_config));
212 rte_config.mem_config = (struct rte_mem_config *) rte_mem_cfg_addr;
213
214 /* store address of the config in the config itself so that secondary
215 * processes could later map the config into this exact location */
216 rte_config.mem_config->mem_cfg_addr = (uintptr_t) rte_mem_cfg_addr;
完成配置初始化后:
682 inline static void
683 rte_eal_mcfg_complete(void)
684 {
685 /* ALL shared mem_config related INIT DONE */
686 if (rte_config.process_type == RTE_PROC_PRIMARY)
687 rte_config.mem_config->magic = RTE_MAGIC;
688 }
接着预分配内存rte_eal_memzone_init
:
420 int
421 rte_eal_memzone_init(void)
422 {
423 struct rte_mem_config *mcfg;
424 const struct rte_memseg *memseg;
425
426 /* get pointer to global configuration */
427 mcfg = rte_eal_get_configuration()->mem_config;
428
429 /* secondary processes don't need to initialise anything */
430 if (rte_eal_process_type() == RTE_PROC_SECONDARY)
431 return 0;
432
433 memseg = rte_eal_get_physmem_layout();
434 if (memseg == NULL) {
436 return -1;
437 }
438
439 rte_rwlock_write_lock(&mcfg->mlock);
440
441 /* delete all zones */
442 mcfg->memzone_cnt = 0;
443 memset(mcfg->memzone, 0, sizeof(mcfg->memzone));
444
445 rte_rwlock_write_unlock(&mcfg->mlock);
447 return rte_eal_malloc_heap_init();
448 }
211 int
212 rte_eal_malloc_heap_init(void)
213 {
214 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
215 unsigned ms_cnt;
216 struct rte_memseg *ms;
217
218 if (mcfg == NULL)
219 return -1;
220
221 for (ms = &mcfg->memseg[0], ms_cnt = 0;
222 (ms_cnt < RTE_MAX_MEMSEG) && (ms->len > 0);
223 ms_cnt++, ms++) {
232 malloc_heap_add_memseg(&mcfg->malloc_heaps[ms->socket_id], ms);
233 }
234
235 return 0;
236 }
99 static void
100 malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
101 {
102 /* allocate the memory block headers, one at end, one at start */
103 struct malloc_elem *start_elem = (struct malloc_elem *)ms->addr;
104 struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr,
105 ms->len - MALLOC_ELEM_OVERHEAD);
106 end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
107 const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem;
108
109 malloc_elem_init(start_elem, heap, ms, elem_size);
110 malloc_elem_mkend(end_elem, start_elem);
111 malloc_elem_free_list_insert(start_elem);
112
113 heap->total_size += elem_size;
114 }
56 void
57 malloc_elem_init(struct malloc_elem *elem,
58 struct malloc_heap *heap, const struct rte_memseg *ms, size_t size)
59 {
60 elem->heap = heap;
61 elem->ms = ms;
62 elem->prev = NULL;
63 memset(&elem->free_list, 0, sizeof(elem->free_list));
64 elem->state = ELEM_FREE;
65 elem->size = size;
66 elem->pad = 0;
67 set_header(elem);
68 set_trailer(elem);
69 }
79 static const unsigned MALLOC_ELEM_TRAILER_LEN = RTE_CACHE_LINE_SIZE;
80
81 #define MALLOC_HEADER_COOKIE 0xbadbadbadadd2e55ULL /**< Header cookie. */
82 #define MALLOC_TRAILER_COOKIE 0xadd2e55badbadbadULL /**< Trailer cookie.*/
83
84 /* define macros to make referencing the header and trailer cookies easier */
85 #define MALLOC_ELEM_TRAILER(elem) (*((uint64_t*)RTE_PTR_ADD(elem, \
86 elem->size - MALLOC_ELEM_TRAILER_LEN)))
87 #define MALLOC_ELEM_HEADER(elem) (elem->header_cookie)
88
89 static inline void
90 set_header(struct malloc_elem *elem)
91 {
92 if (elem != NULL)
93 MALLOC_ELEM_HEADER(elem) = MALLOC_HEADER_COOKIE;
94 }
95
96 static inline void
97 set_trailer(struct malloc_elem *elem)
98 {
99 if (elem != NULL)
100 MALLOC_ELEM_TRAILER(elem) = MALLOC_TRAILER_COOKIE;
74 void
75 malloc_elem_mkend(struct malloc_elem *elem, struct malloc_elem *prev)
76 {
77 malloc_elem_init(elem, prev->heap, prev->ms, 0);
78 elem->prev = prev;
79 elem->state = ELEM_BUSY; /* mark busy so its never merged */
80 }
101 }
114 static const unsigned MALLOC_ELEM_HEADER_LEN = sizeof(struct malloc_elem);
115 #define MALLOC_ELEM_OVERHEAD (MALLOC_ELEM_HEADER_LEN + MALLOC_ELEM_TRAILER_LEN)
以上分配可用空间,并在end_elem
预留一定的内存空间(MALLOC_ELEM_OVERHEAD
),在start_elem
首尾设置cookie,作用后面分析,并根据cacheline对齐end_elem
:
|<head_cookie><...><tail_cookie>|初始化就分两个,一个整块内存和的end_elem
,后者就大小MALLOC_ELEM_OVERHEAD
。
180 void
181 malloc_elem_free_list_insert(struct malloc_elem *elem)
182 {
183 size_t idx;
184
185 idx = malloc_elem_free_list_index(elem->size - MALLOC_ELEM_HEADER_LEN);
186 elem->state = ELEM_FREE;
187 LIST_INSERT_HEAD(&elem->heap->free_head[idx], elem, free_list);
188 }
把分配好的内存插入到适合的位置中去,这里策略是不包含struct malloc_elem
占用的内存空间。回到malloc_elem_can_hold
,其中会检查当前剩余内存是否满足size的需求:
87 static void *
88 elem_start_pt(struct malloc_elem *elem, size_t size, unsigned align,
89 size_t bound)
90 {
91 const size_t bmask = ~(bound - 1);
92 uintptr_t end_pt = (uintptr_t)elem +
93 elem->size - MALLOC_ELEM_TRAILER_LEN;//不包括tail_cookie空间
94 uintptr_t new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align);
95 uintptr_t new_elem_start;
96
97 /* check boundary */
98 if ((new_data_start & bmask) != ((end_pt - 1) & bmask)) {
99 end_pt = RTE_ALIGN_FLOOR(end_pt, bound);
100 new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align);
101 if (((end_pt - 1) & bmask) != (new_data_start & bmask))
102 return NULL;
103 }
104
105 new_elem_start = new_data_start - MALLOC_ELEM_HEADER_LEN;
106
107 /* if the new start point is before the exist start, it won't fit */
108 return (new_elem_start < (uintptr_t)elem) ? NULL : (void *)new_elem_start;
109 }
这里计算是否有足够的可用内存(size),不包括tail_cookie
空间并进行对齐,最后还要空出一个承载struct malloc_elem
信息的内存大小。从这里可以看出因对齐等原因会造成内部碎片,并额外需要MALLOC_ELEM_HEADER_LEN
空间大小。find_suitable_element
貌似优先选择hugepage?[疑问]。如果找到足够可用的内存空间则:
199 /*
200 * reserve a block of data in an existing malloc_elem. If the malloc_elem
201 * is much larger than the data block requested, we split the element in two.
202 * This function is only called from malloc_heap_alloc so parameter checking
203 * is not done here, as it's done there previously.
204 */
205 struct malloc_elem *
206 malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align,
207 size_t bound)
208 {
209 struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
210 const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
211 const size_t trailer_size = elem->size - old_elem_size - size -
212 MALLOC_ELEM_OVERHEAD;
213
214 elem_free_list_remove(elem);
215
216 if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {//尾部可以合并成一个可用内存空间
217 /* split it, too much free space after elem */
218 struct malloc_elem *new_free_elem =
219 RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);
220
221 split_elem(elem, new_free_elem);
222 malloc_elem_free_list_insert(new_free_elem);
223 }
224
225 if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {//剩余头部内存空间不够利用
226 /* don't split it, pad the element instead */
227 elem->state = ELEM_BUSY;
228 elem->pad = old_elem_size;
229
230 /* put a dummy header in padding, to point to real element header */
231 if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
232 * is cache-line aligned */
233 new_elem->pad = elem->pad;
234 new_elem->state = ELEM_PAD;
235 new_elem->size = elem->size - elem->pad;
236 set_header(new_elem);
237 }
238
239 return new_elem;
240 }
242 /* we are going to split the element in two. The original element
243 * remains free, and the new element is the one allocated.
244 * Re-insert original element, in case its new size makes it
245 * belong on a different list.
246 */
247 split_elem(elem, new_elem);//剩余空间后续再次利用
248 new_elem->state = ELEM_BUSY;
249 malloc_elem_free_list_insert(elem);
250
251 return new_elem;
252 }
因为在分配内存时,进行了对齐等操作,导致有些额外的无法利用的内存空间,trailer_size
大小,其实是在elem_start_pt
中,end_pt
减去&tail_cookie
的距离,再加上MALLOC_ELEM_TRAILER_LEN
,这里尝试对碎片进行回收:
if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE)
,至少有MIN_DATA_SIZE(RTE_CACHE_LINE_SIZE)
个有效载荷放数据,则进行分割加入到freelist中:
122 /*
123 * split an existing element into two smaller elements at the given
124 * split_pt parameter.
125 */
126 static void
127 split_elem(struct malloc_elem *elem, struct malloc_elem *split_pt)
128 {
129 struct malloc_elem *next_elem = RTE_PTR_ADD(elem, elem->size);
130 const size_t old_elem_size = (uintptr_t)split_pt - (uintptr_t)elem;
131 const size_t new_elem_size = elem->size - old_elem_size;
132
133 malloc_elem_init(split_pt, elem->heap, elem->ms, new_elem_size);
134 split_pt->prev = elem;
135 next_elem->prev = split_pt;
136 elem->size = old_elem_size;
137 set_trailer(elem);
138 }
当进行分割后,剩余内存空间可能不足MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE
,此时不能再利用,便设置ELEM_BUSY
并修正new_elem
;否则剩余的可以在以后分割;
rte_malloc
算是分析完成了,里面还是会存在竟争的情况,不过这里根据socket_id分成多个,这样当处于不同core取就近socket_id时加不同的锁,只有本地内存不够的情况下会尝试申请其他内存rte_spinlock_lock(&heap->lock)
;因为分配是要进行分割,那么释放可能要进行合并等操作。
57 /* Free the memory space back to heap */
58 void rte_free(void *addr)
59 {
60 if (addr == NULL) return;
61 if (malloc_elem_free(malloc_elem_from_data(addr)) < 0)
63 }
121 static inline struct malloc_elem *
122 malloc_elem_from_data(const void *data)
123 {
124 if (data == NULL)
125 return NULL;
126
127 struct malloc_elem *elem = RTE_PTR_SUB(data, MALLOC_ELEM_HEADER_LEN);
128 if (!malloc_elem_cookies_ok(elem))
129 return NULL;
130 return elem->state != ELEM_PAD ? elem: RTE_PTR_SUB(elem, elem->pad);
131 }
108 #define RTE_PTR_SUB(ptr, x) ((void*)((uintptr_t)ptr - (x)))
103 /* check that the header and trailer cookies are set correctly */
104 static inline int
105 malloc_elem_cookies_ok(const struct malloc_elem *elem)
106 {
107 return elem != NULL &&
108 MALLOC_ELEM_HEADER(elem) == MALLOC_HEADER_COOKIE &&
109 MALLOC_ELEM_TRAILER(elem) == MALLOC_TRAILER_COOKIE;
110 }
这里在free内存时,根据data+MALLOC_ELEM_HEADER_LEN
作一定的偏移量,使之指向rte_malloc
时分配的起始处,并对head_cookie/tail_cookie
的校验;由于在分配内存时,剩下的内存可能不足以再次使用,所以这里根据ELEM_PAD
再作pad
大小的偏移量。
266 /*
267 * free a malloc_elem block by adding it to the free list. If the
268 * blocks either immediately before or immediately after newly freed block
269 * are also free, the blocks are merged together.
270 */
271 int
272 malloc_elem_free(struct malloc_elem *elem)
273 {
274 if (!malloc_elem_cookies_ok(elem) || elem->state != ELEM_BUSY)
275 return -1;
276
277 rte_spinlock_lock(&(elem->heap->lock));
278 size_t sz = elem->size - sizeof(*elem);
279 uint8_t *ptr = (uint8_t *)&elem[1];
280 struct malloc_elem *next = RTE_PTR_ADD(elem, elem->size);
281 if (next->state == ELEM_FREE){//合并后继空闲块
282 /* remove from free list, join to this one */
283 elem_free_list_remove(next);
284 join_elem(elem, next);
285 sz += sizeof(*elem);
286 }
288 /* check if previous element is free, if so join with it and return,
289 * need to re-insert in free list, as that element's size is changing
290 */
291 if (elem->prev != NULL && elem->prev->state == ELEM_FREE) {//合并前继空闲块
292 elem_free_list_remove(elem->prev);
293 join_elem(elem->prev, elem);
294 sz += sizeof(*elem);
295 ptr -= sizeof(*elem);
296 elem = elem->prev;
297 }
298 malloc_elem_free_list_insert(elem);
299
300 /* decrease heap's count of allocated elements */
301 elem->heap->alloc_count--;
302
303 memset(ptr, 0, sz);
304
305 rte_spinlock_unlock(&(elem->heap->lock));
306
307 return 0;
308 }
如注释所说明的,在释放一个块时,如果前继或后继块是空闲的则会进行合并,以形成更大的空闲块插入到freelist中。该函数先判断是否是使用中才释放的,防止类似double free
情况。
并对所属的heap进行加锁操作,移到下一个块的起始处next
,如果next所指的块是free状态则把它从freelist中移走:
254 /*
255 * joing two struct malloc_elem together. elem1 and elem2 must
256 * be contiguous in memory.
257 */
258 static inline void
259 join_elem(struct malloc_elem *elem1, struct malloc_elem *elem2)
260 {
261 struct malloc_elem *next = RTE_PTR_ADD(elem2, elem2->size);//获取elem2的next空闲块
262 elem1->size += elem2->size;
263 next->prev = elem1;//next->next的空闲块指向elem
264 }
这里合并后的next节点块不再需要struct malloc_elem
头部信息,但还是要保留elem的头部信息,所以和后续节点合并后的可用大小是:((elem->size)-sizeof(*elem)) + next->size
这个还是挺好理解的,这里不像和前继节点合并要判断是否为nill,因为在分配时,始终有个end_elem
节点作为最后一个标识节点。
合并前继节点其实和合并后继一样的过程,只不过需要额外调整ptr的地址,因为后面释放时,是进行了重新置0,这点比free/delete操作要好,否则是使用上一次脏的位模式。
最后把合并后大的空闲块再插入到freelist中,并释放锁,整个过程就是这样。
有些细节这里是跳过的,有兴趣的可以自己研究下dpdk,已经两年多没有再接触此相关工作,只是兴趣而已。
参考资料:
DPDK Slab Allocator and zero-copy
SGI STL 内存分配方式及malloc底层实现分析
如何实现一个malloc
TCMalloc:线程缓存的Malloc