学习一下类似于谷歌tcmalloc的高并发内存池。

项目代码都在GitHub上了,本文是对项目开发过程中的设计学习以及遇到问题的解决方法。

备注:当前项目在Linux上运行有些问题(mmap地址对齐没有解决),所以暂时只能在win32上开发!

1.什么是内存池

核心思想:内存池预先申请大块内存,当其他代码需要申请堆区内存时,不调用malloc/new,而是调用我们自己实现的内存池的接口来申请内存。

因为内存池里面已经预先申请了很多内存,所以它可以直接分配给其他模块。而分配已经申请的内存会比使用malloc/new向操作系统申请内存快非常多!

这就好比在家里屯一点纸巾,当纸巾没了,直接用家里的纸巾,不用去小卖部重新买了,自然会快一些。

而malloc/new这些能适配所有场景的内存申请函数,自然会有额外的性能损失,当一个系统对性能要求很高的时候,使用内存池来预先申请+分配内存,就可以节省不少的时间,提高系统运行效率。

因为我们是自己实现了一个内存池,相当于替代了malloc/new的工作,此时就可以使用底层的系统调用接口来直接向操作系统申请内存(malloc/new会有额外封装)

  • windows: VirtualAlloc
  • Linux: BRK或MMAP

分配内存后,如果是C++的对象,可以通过定位new来调用类的构造函数。

1.1 什么是高并发内存池

这个项目想实现的高并发内存池,就是在实现一个内存池的基础上,要满足多线程高并发请求空间申请时的性能和不出错。

这就涉及到多线程竞争以及加锁机制了,后文会补全。

1.2 内碎片和外碎片

  • 内碎片:线程池内部分配了却没有使用的内存
  • 外碎片:进程地址空间中,分配了多块内存,没有全部回收,导致剩余的内存虽然有足够大小,但不连续,无法实现大内存分配。

内碎片和外碎片的场景如下图所示

image-20240224103722887

假设一个进程申请了512KB的内存,却始终只使用了400KB,这里就出现了112KB的内碎片(申请了但没有完全使用);

外碎片就看图吧,因为分配给进程的空间并不是连续分配的,分配的两个内存空间之间可能会留有少部分的未使用内存,此时这些未使用内存并不连续,无法组合成大块空间,如果此时进程又申请大块空间,可能可用内存中剩余空间总量足够,但因为这些小碎片不连续,所以无法分配给进程。这种就叫内存的外碎片

2.定长内存池的实现

2.1 思路

对于定长内存池而言,实现比较简单

  • 申请大块空间;
  • 使用链表的方式来链接这些空间;
  • 当申请的时候,释放链表头部的空间;
  • 当销毁的时候,使用链表头插将空间复原到链表中;
  • 当空间不够用的时候(链表为空)重新申请大块空间;

为了实现链表的结构,定长内存池的单个空间分片的大小应该大于平台中一个指针的大小(不然没有办法实现指向下一块空间的地址)

因为内存池中的内存已经被分配出去了(会有碎片)所以内存池申请的内存是不能被释放的。如果一直没有模块从内存池中申请内存,就可能会产生内存的浪费。但这和内存池的功能相悖:如果系统申请内存的频率很低,那说明不需要内存池的介入。

只要最终内存池的进程是正常结束的,这些被申请的内存都会被操作系统托管和释放。

2.2 图解

初始化线程池时,会先申请大块内存,并用指针指向内存开头。

当有模块申请内存时,将内存开头的部分分配出去,并向后移动指针(同时还需将内存大小计数器给减小)

image-20240223130822219

当回收内存时,将另外一个指针FREELIST指向被回收内存的开头,并将被回收内存的前4/8个字节指向NULLPTR或者另外一个被回收的内存分片。

image-20240223131544965

当FREELIST为空,且用于标示预先申请的大块内存剩余容量的size为0的时候,就需要重新申请内存了(代表当前线程池已经没有可分配的内存)

image-20240223132634407

定长内存池的思路还是比较简单的。

2.3 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <stdlib.h>

template<class T>
class FixMemoryPool
{
public:
T* New()
{
T* obj = nullptr;

// 优先把还回来内存块对象,再次重复利用
if (_freeList)
{
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
}
else
{
// 剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;
_memory = (char*)malloc(_remainBytes);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}

obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainBytes -= objSize;
}

// 定位new,显示调用T的构造函数初始化
new(obj)T; // 即便是POD类型也是有个构造函数的

return obj;
}

void Delete(T* obj)
{
// 显示调用析构函数
obj->~T();

// 头插,使用void*强转能保证转换后的空间一定是个指针的大小
*(void*)obj = _freeList; // 指向原本的freelist开头的地址
_freeList = obj; // 更新头节点
}

private:
char* _memory = nullptr; // 指向大块内存的指针
size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数

void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
};

2.4 性能测试

下面用循环申请内存的方式来测试一下这个定长内存池的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <cstdlib>
#include <vector>
#include <ctime>
using namespace std;

struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;

TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};

int main()
{
const size_t Rounds = 5; // 申请释放的轮次
const size_t N = 100000; // 每轮申请释放多少次

std::vector<TreeNode*> v1;
v1.reserve(N);
// new的耗时测试
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();

std::vector<TreeNode*> v2;
v2.reserve(N);
// 定长内存池的耗时测试
FixMemoryPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();

cout << "new cost time:" << end1 - begin1 << endl;
cout << "memory pool cost time:" << end2 - begin2 << endl;
return 0;
}

使用vs2019的x86-debug模式下运行,二者的时间消耗已经对比明显

1
2
new cost time:195
memory pool cost time:100

使用x86-release的模式运行,时间消耗差距就更大了

1
2
new cost time:21
memory pool cost time:1

这就是线程池存在的意义。

3.高并发内存池整体框架设计

简单实现了一个定长的内存池,下面就要学习一下如何实现一个高并发的内存池。谷歌的tcmalloc在多线程环境下会有更好的性能,我们模拟设计时也需要考虑相关的问题:

  • 性能应该优于malloc/new;
  • 多线程环境下竞争和锁申请问题;
  • 内存碎片问题;

最终的设计主要由下面三个部分构成

  1. ThreadCache:供每个线程独有,用于小于256KB的内存分配,因为每个线程独有一个ThreadCache,线程从这里申请内存时不需要加锁,效率高;
  2. CentralCache:中心缓存供所有线程共享,ThreadCache按需从CentralCache中获取内存。CentralCache在合适的时候回收分配给线程的ThreadCache,避免某个线程占用太多未使用的内存,达到多个线程均衡调度的目的。CentralCache存在多线程竞争,需要加锁;
  3. PageCache:以页为单位向操作系统申请内存。当CentralCache没有内存可分配时会向PageCache申请,PageCache会向操作系统申请一定的Page的内存放入span对象,这个大块内存会在span内部切割成多个定长大小的小块内存,最终给CentralCache的是span对象。当一个span的内部小块内存都被回收后,PageCache会回收CentralCache中满足条件的内存span对象,并合并相邻的页,组成更大的页,缓解内存碎片问题;

下图是这个框架的示意

image-20240223160514186

三层设计后,每一层的工作都不一样了

  • ThreadCache:为线程分配空间;
  • CentralCache:为ThreadCache分配空间并回收部分空间;
  • PageCache:分页管理和向操作系统获取空间;

其中,只有多个ThreadCache同时没内存需要向CentralCache申请的时候,才会出现加锁的问题。这种情况其实是不多的,所以CentralCache并不会有特别大的锁竞争问题。

而且,大部分情况下申请内存大小都不会大于256KB,所以ThreadCache是基本能满足线程申请空间的需求的。上述二者结合,就提高了多线程并发的效率。

4.ThreadCache设计

ThreadCache只对一个线程服务,它的设计思路和定长内存池其实是非常相似的。不过ThreadCache需要支持不定长内存的分配,需要对回收内存的FreeList做设计上的更正。

4.1 哈希桶

为了更加方便的管理不同大小的内存空间申请,ThreadCache采用了哈希桶的方式,以一定空间为分割,链接不同大小的回收内存,类似多个不同大小的定长自由链表。

image-20240224104548066

当线程需要8字节以下空间时,就从8字节的桶里面分配给它;当进程需要8到16字节空间时,就从16字节的桶里面分配,以此类推。

很明显,这种方式会产生内碎片,比如申请5字节空间,ThreadCache还是会从8字节的桶中分配给它,出现了3字节的空间浪费。

但这个空间浪费是必须接受的,否则就需要用更加复杂的方式来管理回收的空间(比如在内存中记录自己的大小),分配的时候还需要遍历找到合适大小的内存来分配,效率反而会降低。

4.2 分配映射规则

但是,如果整个哈希桶都用8字节来做分割,256KB/8B = 32768,最终整个哈希桶的数组部分就会非常非常长,本身占用的空间就很离谱了。

所以,需要采用一个让内碎片尽量保持在10%左右的哈希映射算法,减小哈希表长度的同时,尽可能的避免内碎片。

字节区间对齐方式哈希桶下标区间
[1, 128]B8B[0,16)
[128+1, 1024]B16B[16,72)
[1024+1, 8*1024]B128B[72,128)
[8*1024+1, 64*1024]B1024B[128,184)
[64*1024+1,256*1024]B8*1024B[184,208)

这个表的含义是,当字节范围在1到128之间时,哈希桶的每个下标对应的freelist内存大小相差8字节;当字节范围在129到1024字之间时,哈希桶每个下标对应的freelist内存大小相差16字节……

这样就可以控制内碎片在10%左右,以129字节到1024字节为例,最终映射的哈希桶如下图所示。

image-20240226093850295

当申请129到144字节之间的空间时,会分配144字节的内存块给线程。此时最大的空间浪费就是这144字节中只使用了129字节。浪费率大概是10.5%;其他区间的内碎片浪费率也是用这个方式计算。

注意:当我们考虑内碎片时,只考虑线程池设计会造成的内碎片问题,而不考虑线程本身申请了却不用造成的空间浪费(这是不可预期的,且和内存池的设计没有关系)。

4.3 代码-分配内存大小

下面是某个对齐区间内,计算需要分配的空间大小的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// size: 申请的块大小
// align: 当前对齐大小
size_t _RoundUp(size_t size, size_t align)
{
size_t returnSize;
if(size % align == 0)
{
returnSize = size; // 能直接模代表就是需要的最大值
}
else
{
// 其他,计算对齐区间内的最大值
returnSize = (size/align + 1) * align;
}
return returnSize;
}

使用这个函数的逻辑计算,当我们需要申请130字节空间时(对齐大小是16字节,应该返回144字节),这个函数计算出来的结果也是144。
$$
returnSize = (130/16 +1) * 16 = 144
$$
这个算法还有个更取巧的写法,用到了位运算,不好想不出来。位运算的效率会略高于乘除法,所以采用这种设计能提高一定的效率(已经是很深层的问题了)

1
2
3
4
size_t _RoundUp(size_t size, size_t align)
{
return ((size + align - 1) & (~(align - 1)));
}

这个算法是怎么实现的呢?以区间9到16为例(都需要分配16字节的空间),假设需要分配10字节空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
10 + 8 - 1 = 17
align - 1 = 8-1 = 7

7的二进制如下
0000 0111
7的二进制取反后
1111 1000
17的二进制
0001 0001

二者相与,得到的结果就是16
1111 1000
0001 0001
——————————
0001 0000

这种牛逼的位运算方法我可想不出来……😣

有了单个对齐区间内计算大小的函数,剩下要做的就是把每个区间和对应的对齐大小给传参进入这个函数就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
size_t RoundUp(size_t size)
{
if(size <= 128)
{
return _RoundUp(size,8);
}
else if(size<=1024)
{
return _RoundUp(size,16);
}
else if(size <= (8*1024))
{
return _RoundUp(size,128);
}
else if(size <= (64*1024))
{
return _RoundUp(size,1024);
}
else if(size <= (256*1024))
{
return _RoundUp(size,8*1024);
}
else{
// 不是threadCache负责的范围了
return 0;
}
}

4.4 代码-计算哈希下标

除了计算需要分配的内存大小,还需要计算这个内存应该链接在哈希桶的哪一个下标。

  • 指定下标的freelist有剩余空间时直接分配;
  • 如果没有,则从预先申请的大块内存中申请;

单个区间内计算下标偏移量的方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
// size: 申请的块大小
// align: 当前对齐大小
size_t _Index(size_t size, size_t align)
{
if (size % align == 0)
{
return size / align - 1;
}
else
{
return size / align;
}
}

使用位运算时可以采用另外一个巧妙的设计方式。将1左移align_shift位相当于计算2的align_shift次方;

1
2
3
4
5
// align_shift是当前对齐大小是2的几次方,如果对齐大小是8,则传参3
size_t _Index(size_t size, size_t align_shift)
{
return ((size + (1 << align_shift) - 1) >> align_shift) - 1;
}

使用位运算的函数,最终的计算代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
size_t Index(size_t bytes)
{
assert(bytes <= 256*1024); // 不能超出ThreadCache的最大服务范围

// 用数组表示每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128){
return _Index(bytes, 3);
}
else if (bytes <= 1024){
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024){
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024){
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024){
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else{
return 0;// 不是threadCache负责的范围了
}

return 0;
}

4.5 ThreadCache分配/回收内存

这部分比较简答,哈希桶对应的FreeList里面有就直接分配,没有就去找CentralCache要。

这里使用assert进行申请大小的判断,是因为ThreadCache不应该接受超过256KB的请求(如果出现了说明外部调用的代码的处理有问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static const size_t MAX_BYTES = 256 * 1024;

void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = RoundUp(size); // 计算大小
size_t index = Index(size); // 计算下标

if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{ // 向centralcache申请内存
return FetchFromCentralCache(index, alignSize);
}
}

void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);

// 找对映射的自由链表桶,插入内存
size_t index = Index(size); // 计算下标
_freeLists[index].Push(ptr);
}

4.6 TLS-线程局部变量

说明

TLS的全称是Thread Local Storage,使用特殊的关键字来修饰一个变量,可以让这个变量变成一个只有某个线程可以访问的独立成员,其他线程无法访问。

比如我有个变量A,那么多个线程都会有一个自己的变量A,他们都可以访问这个A,但访问的并不是同一个,也就不涉及到加锁问题。

对于线程独有的ThreadCache而言,我们就可以使用这个特性来让每个线程独有一个变量,避免构建ThreadCache的时候还需要辨别当前线程号或加锁操作。

windows

1
static _declspec(thread) ThreadCache* TLSThreadCache = nullptr;

这里使用_declspec(thread)就是windows平台下声明TLS变量的方式,加上static是为了避免在包含头文件的时候该全局变量声明被复制多次导致重复。

微软官方文档:https://learn.microsoft.com/en-us/cpp/c-language/thread-local-storage?view=msvc-170&redirectedfrom=MSDN

用下面的代码对TLS变量做一个简单的测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <thread>
#include <cstdio>

_declspec(thread) int* TLStest = nullptr;

int main()
{
auto test_func = []() {
printf("before:%p\n", TLStest);
if (TLStest == nullptr)
{
TLStest = new int(10);
}
printf("after:%p\n", TLStest);
delete TLStest;
};

std::thread t1(test_func);
std::thread t2(test_func);
t1.detach();
t2.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}

使用VS2019执行,输出结果如下所示,两个线程调用函数中的打印和初始化,能发现最终初始化出来的地址是不同的,而且&TLStest的地址也不一样,说明TLStest变量被每个线程独有。

1
2
3
4
[before] TLStest:00000000  &TLStest:00ACCEBC
[after] TLStest:00ACE0E0 &TLStest:00ACCEBC
[before] TLStest:00000000 &TLStest:00ACCDE4
[after] TLStest:00ACE1D0 &TLStest:00ACCDE4

Linux

在Linux下的线程本地存储可以用__thread关键字声明。但需要注意,这个关键字只能声明POD(内置类型),不能声明自定义的class对象。当然,自定义class的指针类型不在此列,因为myclass*这类指针类型始终是一个指针,属于C/C++的内置类型。

1
__thread int* TLStest = nullptr;

Linux下上述代码的测试结果,和windows下的效果类似。

1
2
3
4
5
❯ g++ test.cpp  -o test1 && ./test1
[before] TLStest:(nil) &TLStest:0x7f63315cd638
[after] TLStest:0x7f632c000f80 &TLStest:0x7f63315cd638
[before] TLStest:(nil) &TLStest:0x7f6330dcc638
[after] TLStest:0x7f632c000f80 &TLStest:0x7f6330dcc638

备注

windows和linux下线程局部变量会有自己的存放位置和存放的地址,但有一个问题是,这些线程局部变量的地址是全进程共享的。

假设线程A知道了线程B的局部变量的地址,那么线程A就可以访问并修改它!因为这个地址是在进程地址空间中的,整个进程共享!

5.CentralCache设计

5.1 span跨度页

CentralCache的主体也采用了和ThreadCache类似的哈希桶的数据结构,但是CentralCache的哈希桶下面链接的不是内存碎片,而是包含一个大块内存的span对象(跨度页)。

在CentralCache的哈希桶中,8字节的桶链接的是内部内存被拆分成8字节小块的Span对象。

image-20240228092725430

Span对象的基本成员如下所示,在win32环境下,一个Span对象是32字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif

struct Span{
PAGE_ID _pageId; //页号
size_t _n; //页的数量

Span* _next = nullptr;
Span* _prev = nullptr;

void* _list = nullptr; // 链接span拆分的小块内存
size_t _useCount = 0; // 使用数量,为0代表没有被使用

size_t _objSize; // 拆分的小块内存的大小

bool _isUse = false; // 是否在占用
// 当Span被分配给CentralCache后设置为true
}

对这个对象做一定说明

  • 假设我们给定一个Span包含512KB的内存(内存是由PageCache申请的);
  • 那么8B的Span就是将这个512KB拆分成8B链接在Span对象内部的list中
  • 256KB的Span就是将512KB拆分成两个256KB链接在Span对象内部的list中
  • 当ThreadCache申请内存时,会从对应哈希桶下标位置的第一个Span对象中分配内存给它,如果第一个Span对象没有剩余内存了,就往后找第二个Span对象申请内存来分配给ThreadCache;
  • 当一个Span的useCount为0时,说明这个Span内部的内存都是没有被使用的,它可以被PageCache回收;
  • PageCache根据Span中的pageID来确定它的页号,可以和相邻页号的Span内存合并成更大的内存,暂为管理或释放给操作系统,一定程度上缓解内存碎片问题。

你可能会有一个疑问,Span对象中似乎没有用于存放大块内存的指针(内部的list并不是用来存大块内存起始地址的,而需要CentralCache来制作内存链表),当PageCache分配给CentralCache的时候,CentralCache要怎么知道这个Span的起始地址呢?

  • 页号是直接用内存地址强转成整形,除以8KB来计算的;
  • 只需要将页号乘以8KB(即8*1024),就能得到这个Span所对应的内存起始地址;
  • 页号+页的数量即可得到内存最后一页的起始地址,再加上8KB即为大块内存的末尾;

用这种方式,我们就节省了一个多余的指针。页号+页的数量也是后续PageCache回收内存时辨别两个Span对象所保存的页是否相邻的重要判据。

5.2 桶锁

为了进一步细化锁的粒度,CentralCache采用桶锁的设计,即每个不同大小的哈希桶下标都会有一个对应的锁

  • 线程A向CentralCache申请16B内存,线程B向CentralCache申请128KB内存,二者不在同一个哈希桶下标位置,所以不存在竞争也不需要加锁;
  • 线程A和线程B都向CentralCache申请16B内存,此时需要加锁;

定义桶锁比较简单,因为Span需要一个单独的List来管理(不能使用原本定长内存池里面的freelist对象了),我们在单独实现的SpanList里面加一个mutex,POP和PUSH的时候加锁/解锁就可以了。

5.3 向PageCache申请/释放内存

当CentralCache中的桶没有指定大小Span的时候,就需要去PageCache中申请内存。申请时需要给定申请的大小,随后PageCache会将包含对应大小内存的Span对象返还给CentralCache,由CentralCache来负责将这块内存切成小块并链接Span内部的list。

当某个Span对象下useCount为0的时候,CentralCache就可以将其归还给PageCache。

5.4 申请内存的数量限制

ThreadCache申请内存的时候会有一个“慢开始”的调节算法。即申请的时候,会略微申请多一些,但也不能一次性申请太多,否则很有可能用不完。

  • 当申请内存的size大的时候,一次向CentralCache申请的内存数量就越少(多少个size大小的内存)
  • 当申请内存的size小的时候,一次向CentralCache申请的内存数量就会多一些;

每申请一次内存,就将对应FreeList的阈值调高一些,这样内存池刚启动的时候,申请的内存就会少一些,运行一段时间后,认为系统对内存的需求会更加频繁,一次分配的内存就会多一些。

同时还需要用一个最高值来限制这个阈值,不能让它一直增长下去。否则容易出现某个线程A因为业务原因,在一段时间内申请了多次内存,但又有一段时间没有申请了,下一次申请时,原本只需要一丢内存,但系统因为该线程A的阈值高分配了过多用不上的内存,此时就容易造成内存池中的内存浪费,乃至其他线程的饿死情况。

5.5 回收ThreadCache中的内存

因为ThreadCache是用freelist来管理内存的,所以CentralCache的回收接口也是以一个链表的区间来回收。

1
void ReleaseListToSpans(void* start, size_t size);

当ThreadCache检测到当前某个哈希桶对应的freelist长度已经大于一次性会申请的内存长度(上文提到的FreeList阈值)时,就会释放一部分内存给CentralCache。这种情况说明ThreadCache保存的内存已经有点多了,可能会用不完。

6.PageCache设计

6.1 基本架构

PageCache的哈希桶和CentralCache基本一致,都是链接的Span对象,只不过PageCache中是通过页面数量作为哈希下标的选址的。

比如一个Span中有2页,那么它就会被映射到下标为2的哈希桶中(只不过这样会导致下标为0的哈希桶空置,也可以在哈希函数里面将页面数量减一来避免这个浪费)

除了哈希桶外,PageCache中还有一个unordered_map用于映射页号和Span对象地址,这样能让后续PageCache回收Span的时候,更快地找到这个Span页号相邻的其他Span对象的地址。

注意,这个unordered_map中需要映射一个Span对应的所有页号和Span地址的对应关系,不能只保存起始页号和Span地址的关系。

image-20240229104149463

6.2 申请内存

当PageCache的哈希桶中没有剩余Span时,就会向操作系统申请内存。

PageCache将以8KB为一个页,去向操作系统申请内存。且为了提高申请内存的效率,会直接使用系统底层接口来获取内存(Windows下VirutalAlloc,Linux下btk和mmap)

申请内存后,PageCache会new一个Span对象,并设置对应的页号和页的起始地址。随后是设置ID和Span对象地址的关系,并将Span返回给CentralCache。

1
2
3
4
5
// k是页的个数
void* ptr = SystemAlloc(k); // 向系统申请内存
Span* span = new Span; // 获取一个Span对象
span->_pageId = (PAGE_ID)ptr >> 13; // 相当于除以8KB,得到页号
span->_n = k; //设置页的个数

优化方式

当然,这里还有一个不太合适的地方。对于一个高并发的内存池而言,我们预期应该是会有很多个Span对象的构建的(即便我们最终的测试环境可能达不到这个并发量),所以这里应该将new Span改成使用上文提到的定长内存池来处理,可以在PageCache初始化的时候就申请一部分内存,供未来新建Span对象的时候使用。

1
Span* span = _spanPool.New();// 使用定长内存池来分配内存

只需要在PageCache中包括一个定长内存池的对象即可。

6.3 分配内存

分配内存的流程如下

  1. 判断哈希桶中对应页面大小的Span是否存在,存在直接分配;
  2. 不存在则向操作系统申请内存,而且每次都会按最大页面来申请(最大页128);
  3. 申请大块内存后,将其拆分成一个CentralCache需要的Span和另外一个剩余的Span;
  4. 将剩余的Span放入哈希表,CentralCache需要的Span返回;

上述步骤中的第三步就是拆分大Span对象的情况,比如当10页的Span没有时,将一个128页的Span拆分成10页和118页的。这时候,一个线程可能会访问PageCache中哈希桶多个下标位置的元素,所以PageCache不能采用桶锁,而采用全局锁(访问PageCache的时候加锁,访问结束解锁)。

另外,拆分Span对象是一个频率比较高的动作,这也是为什么PageCache中会出现多个可以进行合并的相邻页面的Span对象。

如果PageCache中不是一次性申请最大页面的内存,而是按需申请,此时操作系统给定的内存很大概率和之前的并不是相邻的,那PageCache自然就没有办法“合并相邻页面”,这也会导致操作系统中内存外碎片较多,且难以处理。

6.4 回收和合并

回收是CentralCache在Span中useCount为0的时候触发的,会将这个Span归还给PageCache。

PageCache收到一个Span后,需要进行合并操作

  • unordered_map中查找前一页的Span(当前页号减一),如果存在,判断是否可以进行合并;
  • 查找后续的Span(当前页号加上页面数量再加一),如果存在,判断是否可以进行合并;

因为这个unordered_map只在合并的时候需要使用,且每次合并时的查询操作都是当前span的页号减一,和当前span末尾页号+1,所以我们只需要给unoreder_map中设置当前页号以及末尾页号对应的span对象地址就行了!节省空间。

为了保险起见和方便理解,项目中我还是采用了遍历页号范围设置全部的方式,这样能避免出现问题😂但会有空间浪费和效率损失。

合并Span需要满足下面几个条件:

  • Span的isUse为false,即这个Span是处于PageCache中暂未分配的
  • Span的页面数量加上当前Span的页面数量不超过128(超过后会超出哈希桶的下标,无法管理,自然不能合并)

合并操作比较简单

  • 如果是合并前一个Span,将前一个Span的页面数量加上当前Span的页面数量即可;
  • 如果是合并后一个Span,将当前Span的页面数量加上后一个Span的页面数量即可;
  • 合并后要将被合并的Span对象释放,并将合并后的Span对象的isUse设置为false;
  • 将合并后的Span对象更换正确的哈希桶下标位置链接(因为包含的页面数量改变了);
  • 修改unodered_map中的页号和Span对象地址的映射表;

6.5 超出256KB的内存申请

因为低于256KB的内存都能被ThreadCache来处理,所以256KB也算是个分水岭。而PageCache中哈希桶最大的下标是128,即最大能托管的Span是1MB的内存,超出1MB的内存就得直接向操作系统申请了。

此时向内存池申请内存的流程会变成下面这样:

  • 申请低于256KB的调用ThreadCache的接口(包括分配和回收);
  • 大于256KB的调用PageCache的接口(包括分配和回收);
  • 当PageCache检测到申请的内存大于128Page的时候,直接向系统申请内存并返回;
  • 当PageCache检测到回收的内存大于128Page的时候,直接释放给操作系统;

到这里,内存池的主题框架就全部成型了,后续要做的就是性能测试和优化了。

6.6 SpanList和子类

因为只有CentralCache需要用到桶锁,而PageCache同意需要使用SpanList,为了节省一定空间(PageCache的SpanList中的锁是没用的),CentralCache我使用了继承自父类SpanList的另外一个类型,在里面添加了一个mutex锁。

在32位windows环境下,mutex类的大小是48字节,128个SpanList,还是能节省一定空间的。

1
2
std::mutex mtx;
cout << sizeof(mtx) << endl; // 32位下是48字节

7.测试

7.1 申请和释放函数

虽然写了三成的设计,但是申请和释放函数并不能直接调用ThreadCache的函数,还需要另外一个函数来进行ThreadCache的构造,以及大于256KB内存的处理。

这里有个复用设计,即将PageCache中的页号对应Span对象指针的映射的unordered_map来确定当前申请的内存的大小,这样就不需要用户主动传入参数了(主动传入容易出现内存泄漏和处理问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_SIZE)
{
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;

PageCache::GetInstance()->Lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size; // 申请大内存没有拆分
PageCache::GetInstance()->Unlock();

void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
// 因为是线程局部变量,所以不需要加锁
if (TLSThreadCache == nullptr)
{
// 这里的static变量是全局可见的
static FixedMemoryPool<ThreadCache> tcPool;
TLSThreadCache = tcPool.New();
}

return TLSThreadCache->Allocate(size);
}
}

static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;

if (size > MAX_SIZE)
{
PageCache::GetInstance()->Lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->Unlock();
}
else
{
assert(TLSThreadCache);
TLSThreadCache->Deallocate(ptr, size);
}
}

7.2 单线程申请

用下面的代码测试,环境是windows x64;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void TestNormalAlloc()
{
void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(12);
void* p4 = ConcurrentAlloc(7000);
void* p5 = ConcurrentAlloc(10);
void* p6 = ConcurrentAlloc(800);
void* p7 = ConcurrentAlloc(30);

cout << p1 << endl;
cout << p2 << endl;
cout << p3 << endl;
cout << p4 << endl;
cout << p5 << endl;
cout << p6 << endl;
cout << p7 << endl;

ConcurrentFree(p1);
ConcurrentFree(p2);
ConcurrentFree(p3);
ConcurrentFree(p4);
ConcurrentFree(p5);
ConcurrentFree(p6);
ConcurrentFree(p7);
}

测试结果如下,释放和申请都没有问题

1
2
3
4
5
6
7
000001A864B20000
000001A864B20008
000001A864B22000
000001A864B24000
000001A864B22010
000001A864B62000
000001A864BA0000

7.3 多线程申请

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void MultiThreadAlloc1()
{
try{
std::vector<void*> v;
for (int i = 0; i < 50; i++)
{
void* ptr = ConcurrentAlloc(6);
cout << "1 - " << std::this_thread::get_id() << " - " << ptr << endl;
v.push_back(ptr);
}

for (auto e : v)
{
ConcurrentFree(e);
}
}
catch (...)
{
cout << "error in MultiThreadAlloc1\n";
return;
}
}

void MultiThreadAlloc2()
{
try {
std::vector<void*> v;
for (int i = 0; i < 50; i++)
{
void* ptr = ConcurrentAlloc(20);
cout << "2 - " << std::this_thread::get_id() << " - " << ptr << endl;
v.push_back(ptr);
}

for (auto e : v)
{
ConcurrentFree(e);
}
}
catch (...)
{
cout << "error in MultiThreadAlloc2\n";
return;
}
}

// 测试多线程处理是否有问题
void TestMultiThread()
{
std::thread t1(MultiThreadAlloc1);
std::thread t2(MultiThreadAlloc2);

t1.detach();
t2.join();
}

也没啥问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1 - 34788 - 0000019E47920000
2 - 14252 - 0000019E47922000
1 - 34788 - 0000019E47920008
1 - 34788 - 0000019E47920010
2 - 14252 - 0000019E47922018
1 - 34788 - 0000019E47920018
2 - 14252 - 0000019E47922030
1 - 34788 - 0000019E47920020
1 - 34788 - 0000019E47920028
1 - 34788 - 0000019E47920030
1 - 34788 - 0000019E47920038
1 - 34788 - 0000019E47920040
1 - 34788 - 0000019E47920048
2 - 14252 - 0000019E47922048
...

注意,进行多线程测试的时候,一定要保证主线程退出时间晚于子线程(比如在thread.join()之后加sleep),避免出现主线程提前退出的情况,这会导致子线程里面的打印不输出,或者其他的一些错误。

7.4 定长内存池段错误解决

刚开始运行的时候,多线程经常遇到段错误问题,而且不是每次都会出现。

一般遇到这种不是每次都会出现的段错误,就可以考虑是否为多线程并发问题了。

后来审查了一下,发现问题所在是申请内存函数中的ThreadCache初始化部分

1
2
3
4
5
6
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (TLSThreadCache == nullptr)
{
static FixedMemoryPool<ThreadCache> tcPool; // 这里的static变量是全局可见的
TLSThreadCache = tcPool.New();
}

在初始化部分这里,设置了一个static的定长内存池,对于所有线程都是可见的。这时候就会出现多线程并发的问题(两个线程同时访问一个定长内存池)。

在PageCache中的Span定长内存池,因为访问PageCache的时候已经加过锁了,所以不会出现并发问题,但是这里的ThreadCache初始化时并没有加锁,就会出现多线程访问问题。

解决方案:

  1. 在定长内存池中加上一个mutex成员,并进行加锁解锁操作;
  2. 在申请内存初始化ThreadCache的部分添加一个锁,并进行加锁解锁操作;

采用第一种方式设置完毕后,连续运行了十几次,都没有再出现段错误问题了。

后续为了优化性能,我改为了第二种方式,在访问ThreadCache的定长内存池部分加锁。因为PageCache中访问Span对象的定长内存池的加锁是无意义的,会有额外性能损失。

当然,这里不使用内存池也没问题,毕竟申请ThreadCache并不算一个高频操作。

1
2
3
4
5
6
7
8
9
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (TLSThreadCache == nullptr)
{
// C++11后static变量初始化时线程安全
static FixedMemoryPool<ThreadCache> tcPool; // 这里的static变量是全局可见的
// 在new之前加锁就可以了
std::unique_lock<std::mutex> lockGuard(TLSThreadCacheMtx);
TLSThreadCache = tcPool.New();
}

8.Linux下接口补全

前文所述的接口都是在Windows下使用的,下面要对Linux中与Windows不一致的地方进行补全。

8.1 PageID的typedef

之前在Windows下是用_WIN32_WIN64这两个宏来确定平台是32位还是64位的,但是Linux下没有类似的宏。最终采用如下方式来确认Linux下平台的位数

1
2
3
4
5
6
7
8
9
10
11
12
13
// 因为64位和32位中内存地址的长度不一样,所以对应类型也得不一样
#ifdef _WIN64 // 64位的win才会有这个宏
typedef unsigned long long PageID;
#elif _WIN32 // 32和64位的win都会有这个宏
typedef size_t PageID;
// 后续还需要插入一些Linux的宏
#elif __linux__
#if __WORDSIZE == 64 // 64位的Linux
typedef unsigned long long PageID;
#else // 32位的Linux
typedef size_t PageID;
#endif
#endif

这里使用了__WORDSIZE这个宏,它其实比较通用,含义是当前平台下指针的大小(单位是比特),32位平台下指针是32比特4字节,64位平台下指针是64比特8字节。

8.2 mmap申请和释放内存

在Linux下使用mmap和munmap两个接口来申请和释放内存,具体使用方式参考man手册。

1
2
3
4
5
#include <sys/mman.h>

void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);
int munmap(void *addr, size_t length);

mmap的参数说明:

  • 第一个参数addr: 指定需要申请的内存在操作系统中的偏移量,设置为NULL让操作系统自行选择;
  • 第二个参数length: 指定申请内存的比特数
  • 第三个参数prot: 指定申请内存的权限(读写)
  • 第四个参数flags: 指定申请内存的方式
  • 第五个参数fd: 申请的内存是否需要和某个文件描述符绑定,-1代表不绑定;
  • 第六个参数offset:指定偏移,对于匿名映射通常设置为0;

释放内存的munmap接口就比较简单了,传入内存的指针和长度就可以了。

示例代码如下

1
2
3
4
5
6
size_t bytes = 1000;
// PROT_READ | PROT_WRITE 指定读写权限
// MAP_PRIVATE | MAP_ANONYMOUS 私有和匿名映射
void *ptr = mmap(NULL, bytes, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
// 释放
munmap(ptr, bytes);

因为windows下的VirtualFree并不需要传入长度,所以在Linux中改造内存池的时候,还需要一个map来存放指针其实地址和长度的关系,保证释放的时候能按正确的内存长度进行释放。

8.3 TLS变量

这一点前文已经提到过区别,Linux下和Windows的代码有一定差异。

1
2
3
4
5
#ifdef _WIN32
static _declspec(thread) ThreadCache *TLSThreadCache = nullptr;
#elif __linux__
static __thread ThreadCache *TLSThreadCache = nullptr;
#endif

8.4 系统内存分配和释放地址不一致问题(未解决)

当直接申请大块内存(比如2000KB)的时候,PageCache会调用SystemAlloc函数,不使用malloc/new而直接调用系统接口来申请大块内存。

这里就出现了问题,申请内存的时候,操作系统返回的内存并不是严格按我们预定的8KB页来起始的,比如下面的0xf7617000这个地址;

1
2
3
alloc ptr: 0xf7617000 - 2048000
2000KB: 0xf7616000
free ptr: 0xf7616000

0xf7617000除以8KB再乘以8KB,得到的结果是0xf7616000,和原本分配的内存不一致!因为PageCache中申请内存的时候会将其按页号来保存在Span对象里面,但操作系统返回的内存并不一定是某个页的起始地址

image-20240304130129640

而且会发现最终调用方得到的内存是0xf7616000,这个地址小于操作系统分配给我们的0xf7617000,如果直接访问,会访问到并不属于当前已分配内存的部分,直接段错误!

这个问题大概率是由Windows和Linux下系统调用接口的不同设计导致的,因为windows下不管是用x86和x64都没有复现出这个问题(在Linux下会有60%的几率出现该问题)

而且该问题影响的不只是超过1MB的大内存申请,PageCache申请内存的时候也有可能会遇到系统调用接口返回的内存并非按8KB对齐的情况,最终导致段错误。

由于这不是线程池设计中的重点,也和线程池本身的思路没有关系,所以暂时挂起不去深究,继续在Windows上以32位对线程池进行优化

9.性能问题

9.1 简单测试

使用下面的代码,在多线程环境下分别测试线程池申请和malloc申请内存的耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void MultiThreadAlloc2()
{
try {
std::vector<void*> v1;
std::vector<void*> v2;

size_t begin1 = clock();
for (int i = 0; i < 100; i++)
{
void* ptr = ConcurrentAlloc(30);
cout << "21 - " << std::this_thread::get_id() << " - " << ptr << endl;
v1.push_back(ptr);
}

for (auto e : v1)
{
ConcurrentFree(e);
}
size_t end1 = clock();

size_t begin2 = clock();
for (int i = 0; i < 100; i++)
{
void* ptr = malloc(30);
cout << "22 - " << std::this_thread::get_id() << " - " << ptr << endl;
v2.push_back(ptr);
}

for (auto e : v2)
{
free(e);
}
size_t end2 = clock();

std::this_thread::sleep_for(std::chrono::seconds(2));
cout << "2 memory pool cost time:" << end1 - begin1 << endl;
cout << "2 malloc cost time:" << end2 - begin2 << endl;
}
catch (...)
{
cout << "error in MultiThreadAlloc2\n";
return;
}
}

在vs2019的debug-x86模式,测试结果如下

1
2
3
4
2 memory pool cost time:98
2 malloc cost time:91
1 memory pool cost time:44
1 malloc cost time:95

会发现线程2的malloc申请时间小于线程池,而线程1的线程池效率更高。再测试一次,又会发现两个线程malloc的效率都更高了。

1
2
3
4
2 memory pool cost time:80
2 malloc cost time:77
1 memory pool cost time:69
1 malloc cost time:61

这明显不是我们想要的结果,如果一个内存池的效率低于malloc,它存在的意义就不是很大了。所以需要针对当前线程池中的性能瓶颈进行一定分析和优化。

9.2 VS2019性能查看器

VS2019的调试选项中有一个性能探查器,它可以帮助我们确定当前项目中哪一个部分最耗时。

image-20240306090902495

调整到DEBUG模式下,选择检测,点击开始

image-20240306091010082

image-20240306091438569

最终选择当前项目并等待运行后,会得到一个下面这样的报告。注意使用性能监看器的时候应该把多线程代码内部的打印和sleep给去掉(否则会显示打印和sleep的耗时很长)

image-20240306091039084

上图是只有两个线程时的报告,下图是4个线程同时运行时的报告

image-20240306141404925

这里就能看到,ConcurrentFree函数占用了很大的CPU时间

image-20240306091851440

image-20240306091802354

在ConcurrentFree函数中,MapObjectToSpan函数和ThreadCache::Deallocate函数占用的时间比较长。

进一步查看,在ThreadCache::Deallocate函数中,耗时的也是MapObjectToSpan这个HashMap的查询操作。

image-20240306092056267

MapObjectToSpan函数中,占用时间长的是加锁和find的操作

image-20240306092009240

如果能省去这里的加锁,那么内存池释放内存的效率就会大大提高

这时候就需要引入另外一种数据结构了:基数树 radix_tree。

9.3 基数树

数据结构说明

在PageCache中的unordered_map是用于映射页号和对应Span对象指针的,这两个本质上都是一个整形。基数树就比较适合这样的场景。

  • 单层基数树其实就是一个“直接定址法”的哈希数组,用页面数量作为数组长度,每个数组的元素都是一个指针;

在32位系统下,这个数组的长度是32-PAGE_SHIFT,对于当前项目而言PAGE_SHIFT是13,则最终数组的长度是2^19,每个元素都是一个4字节的指针类型,数组的大小就是2048KB,即2MB。这个大小并不算大,可以接受。

很明显,使用单层基数树,时间复杂度就直接被压缩到了数组下标直接访问的O(1),比unordered_map提供的哈希表效率更高(因为单层基数树完全没有哈希碰撞和扩容问题)。

  • 多层基数树利用页号的比特位来分层

如下所示,在32位下,页号只有低19位是有意义的。因为页号是由32位地址右移13位得来的。

image-20240306145039892

将低的十九位进一步分为高5位,低13位;这里的高5位就作为第一层的映射,低13位作为第二层的映射

  • 第一层的长度为2的5次方;
  • 第二层的长度为2的13次方;

最终基数树的数组长度是2^5 * 2^13 = 262,144,占用空间大小是1024KB,即1MB。分层不仅节省了空间,而且还避免了一次性申请大块内存(如果是单层基数树需要一次性把数组的空间申请出来)。

image-20240306150859855

更多层的基数树都是按上述思路往下扩展,如果是六十四位的系统就需要用更多层的基数树。

基数树因为每一个页号都有一个单独的存储空间,在修改的时候只会修改局部,并不影响整体。而且和红黑树这种数据结构有一点不同,红黑树为了保证key的有序,在插入的时候可能需要进行树结构的旋转,假设查询的时候遇到这种旋转,效率就会变低甚至出现错误,这也是为什么使用map的时候需要加锁unoredered_map也是同理,遇到哈希碰撞和空间不足的时候,都会修改原本数据的存放结构,为了避免多线程问题,就需要加锁才能进行查询。

而基数树不管是插入还是删除操作都不会影响整个树的结构,你可以理解为基数树创建完毕之后整个树的结构就不会动了,这一点就让它在一定程度上是线程安全的。只要多线程不会同时读写相同页号的数据,就不会出现并发问题。

回到项目

再来看看项目中什么时候需要设置映射,什么时候会查询映射?

设置页号和Span对象的指针函数为SetMapObjectToSpan,只在PageCache的NewSpan和ReleaseSpanToPageCache函数中被调用。而访问PageCache的时候都会加锁,这里对映射的写操作不存在多线程问题。

image-20240306152054722

查询操作都是在ConcurrentFreeCentralCache::ReleaseListToSpans这两个内存释放函数中。

image-20240306153145300

PageCache中什么时候会修改映射:

  • 将对应大小Span分配给CentralCache时;
  • 将大Span拆成小Span,并分配给CentralCache时;
  • 回收时将小Span与相邻Span合并时;

而查询操作都是在回收内存的时候出现的,被回收的内存一定不会是刚刚准备被分配的Span对象,因为分配内存后的Span对象就会从PageCache的SpanList中删除,PageCache不管是回收合并操作还是拆分操作都不会遇到这个不存在于SpanList中的Span对象。换而言之,当前映射的写操作和读操作不会同时访问相同页号的位置!而基数树的特性让这种读写不同下标位置的操作无需加锁!

去掉了锁竞争,效率就提高了!

代码实现

单层基数树,其实就是一个数组,给定一个Get和一个Set,就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 单层基数树,利用模板变量来获取长度
template<size_t BITS>
class PageMap1
{
private:
static const size_t LENGTH = 1 << BITS; // 用于确定基数树的长度
void** _array; // 一个数组,数组内的每个元素都是一个void*指针
public:
PageMap1()
{
size_t size = sizeof(void*) * LENGTH; // 整个数组的长度
size_t alignSize = SizeClass::RoundUp(size); // 调用SystemAlloc申请内存的时候需要按页对齐
_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); // 为了可移植性,使用C语言的强制转换
memset(_array, 0, size); // 清零
}
// 这里采用uintptr_t,保证可以覆盖当前平台上的指针数量
typedef uintptr_t Number;

// set的时候需要保证key有效
void Set(Number key, void* value)
{
_array[key] = value;
}

void* Get(Number key)
{
// 超过当前的允许范围了
if ((key >> BITS) > 0) {
return NULL;
}
return _array[key];
}
};

9.4 再次性能测试

将PageCache中的unordered_map改为单层基数树,然后再进行测试。这次使用更好的自动创建线程来并发申请的方式进行测试,方便修改测试变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// malloc多线程测试
// ntimes 一轮申请和释放内存的次数
// nworks 线程数量
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;

for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);

for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(35));
//v.push_back(malloc((35 + i) % 8192 + 1));
}
size_t end1 = clock();

size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();

malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}

for (auto& t : vthread)
{
t.join();
}

printf("%u 个线程并发执行 %u 轮次,每轮 malloc %u 次,耗时:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());

printf("%u 个线程并发执行 %u 轮次,每轮 free %u 次,耗时:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());

printf("%u 个线程并发执行malloc/free %u 次,总计耗时:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}


// 内存池多线程测试
// ntimes 一轮申请和释放内存的次数
// nworks 线程数量
// rounds 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;

for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);

for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(35));
//让每一次申请的内存大小不一样
//v.push_back(ConcurrentAlloc((35 + i) % 8192 + 1));
}
size_t end1 = clock();

size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();

malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}

for (auto& t : vthread)
{
t.join();
}

printf("%u 个线程并发执行 %u 轮次,每轮内存池申请 %u 次,耗时:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());

printf("%u 个线程并发执行 %u 轮次,每轮内存池释放 %u 次,耗时:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());

printf("%u 个线程并发执行内存池申请和释放 %u 次,总计耗时:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

void BenchMark()
{
size_t n = 10000;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;

BenchmarkMalloc(n, 4, 10);
}

以下是使用uordered_map的测试结果(DEBUG-X86),性能远低于malloc/free。

1
2
3
4
5
6
7
8
4 个线程并发执行 10 轮次,每轮内存池申请 100000 次,耗时:4756 ms
4 个线程并发执行 10 轮次,每轮内存池释放 100000 次,耗时:25577 ms
4 个线程并发执行内存池申请和释放 4000000 次,总计耗时:30333 ms


4 个线程并发执行 10 轮次,每轮 malloc 100000 次,耗时:6064 ms
4 个线程并发执行 10 轮次,每轮 free 100000 次,耗时:1797 ms
4 个线程并发执行malloc/free 4000000 次,总计耗时:7861 ms

以下是使用单层基数树的测试结果(DEBUG-X86),可以看到此时我们的内存池效率就完美体现出来了。

1
2
3
4
5
6
7
8
4 个线程并发执行 10 轮次,每轮内存池申请 10000 次,耗时:177 ms
4 个线程并发执行 10 轮次,每轮内存池释放 10000 次,耗时:72 ms
4 个线程并发执行内存池申请和释放 400000 次,总计耗时:249 ms


4 个线程并发执行 10 轮次,每轮 malloc 10000 次,耗时:644 ms
4 个线程并发执行 10 轮次,每轮 free 10000 次,耗时:181 ms
4 个线程并发执行malloc/free 400000 次,总计耗时:825 ms
1
2
3
4
5
6
7
8
4 个线程并发执行 10 轮次,每轮内存池申请 100000 次,耗时:1899 ms
4 个线程并发执行 10 轮次,每轮内存池释放 100000 次,耗时:796 ms
4 个线程并发执行内存池申请和释放 4000000 次,总计耗时:2695 ms


4 个线程并发执行 10 轮次,每轮 malloc 100000 次,耗时:5947 ms
4 个线程并发执行 10 轮次,每轮 free 100000 次,耗时:2298 ms
4 个线程并发执行malloc/free 4000000 次,总计耗时:8245 ms

改成Release模式重新测试一下,可见Release模式对malloc操作有一定优化,二者差距没有那么高,不过已经能体现当前内存池的性能还不错了。

1
2
3
4
5
6
7
8
4 个线程并发执行 10 轮次,每轮内存池申请 100000 次,耗时:169 ms
4 个线程并发执行 10 轮次,每轮内存池释放 100000 次,耗时:118 ms
4 个线程并发执行内存池申请和释放 4000000 次,总计耗时:287 ms


4 个线程并发执行 10 轮次,每轮 malloc 100000 次,耗时:165 ms
4 个线程并发执行 10 轮次,每轮 free 100000 次,耗时:189 ms
4 个线程并发执行malloc/free 4000000 次,总计耗时:354 ms

再用性能监视器查看,发现性能消耗最大的已经是vector的push操作了,原先耗时的Map查询操作现在占比已经不高了。

image-20240306214130138

9.5 线程模拟负载

上文9.4中的线程只是在申请和释放内存,并没有做其他的工作。面试是时候问道了就很是尴尬,我们的线程池只是在重复的进行申请和释放操作,却没有具体业务,最终测出来的性能只能停留在理论上。

所以需要想一个办法来模拟高并发下每个线程的干活的耗时。因为不同耗时,不同的处理间隔才能更好的复现实际场景。

我能想到的是用一个随机数生成器,随机生成一个休眠时间让线程休眠,添加到线程每次申请内存之后,模拟每次都需要休眠一段时间(干活)再继续申请的情况。

但是这就导致每一次申请和释放的代码之中会有一个休眠,我们应该如何保证malloc和内存池测试时休眠的时长一致呢?如果不一致,结果就没有参考性了。

这里我的想法是用一个atomic变量的总休眠耗时计数器,在malloc和内存池测试中都初始化为相同值,这样每次休眠的时间就可控了(最终总的休眠时长基本一致)。

不过具体写下来感觉很奇怪,一个休眠直接把malloc和内存池的性能拉到一致了……暂时没有明白具体的性能瓶颈在哪里

The end

内存池的基本学习和项目的实现内容到这里就OVER啦!