布鲁斯的技术小屋

leveldb(三) 内存序

在(二)中讲了skiplist,其实跳过了颇为重要的一部分

为了保证skiplist的线程安全

skiplist中对两个变量使用了原子变量

1
2
std::atomic<int> max_height_; // Node的最大高度
std::atomic<Node*> next_[1]; // Node中每层的next节点

涉及这两个变量的存取涉及到了三种内存序

  1. std::memory_order_release
  2. std::memory_order_relaxed
  3. std::memory_order_acquire

下面讲一下内存序

内存序

三级cache

缓存一致性

正是这样的SMP架构(共享多处理器),cache存在本地(L1/L2)的和共享(L3),因而存在一致性的问题
即core1读到数据A后,进行修改,没有及时写回共享cache(L3),此时倘若core2读数据A,这时候读到的就是旧值,此时core1的写和core2的读就没有完成同步

内存屏障

C++11中的内存序

c++11中引入了六种内存序

  1. relaxed
  2. acquire
  3. release
  4. consume
  5. acq_rel
  6. seq_cst

六种内存序实际上实现了三种内存模型

  1. relaxed(要求最低,性能最好)
  2. acquire release
  3. sequential consistent

他们保证了个啥呢

(以下感谢如何理解 C++11 的六种 memory order? - 知乎 (zhihu.com),同时参考了std::memory_order - cppreference.com

  1. relaxed order
    1. 只保证了单个线程内的原子操作是顺序的
    2. 线程间呢?你想咋搞就咋搞
  2. release–acquire
    1. 类似于mutex的lock和unlock操作
    2. 线程A原子性的存入X (release),线程B原子性的读X(acquire),(其实就是将新值从local cache中更新到了shared cache,而且读的线程也从shared cache中获取新值嘛)
    3. 同时带来一个副作用——线程A中存入X之前的操作,在线程B执行读取X之后都能看见(因为线程A这时的local cache已经更新了,而且线程B的cache也从shared cache中更新)
  3. release–consume
    1. release–acquire这个副作用太强了,我想避免这个开销怎么办?
    2. release–consume中的副作用变成了依赖读和依赖写,就是线程A中,存入X之前的依赖X的写操作,在线程B中读取X后,依赖X的读操作都能看到
  4. seq_cst

下面结合LevelDB中的一个例子

1
2
3
4
5
6
7
8
9
10
11
Node* Next(int n) {
assert(n >= 0); // 1
// 使用acquire load保证访问的是初始化了的node
return next_[n].load(std::memory_order_acquire); // 2
}
void SetNext(int n, Node* x) {
assert(n >= 0); // 3
// 使用release store保证使用这个函数
// 来进行next的插入的都是已经初始化了的node
next_[n].store(x, std::memory_order_release); // 4
}
  • Next函数返回在高度n下的下一个node节点

  • SetNext函数设置在高度n下的下一个节点为x

  • 这里通过release-acquire,保证了三个东西

    • 1的运行一定在2之前,即通过Next(n)读到的node一定是已经初始化了的
    • 3的运行一定在4之前,保证SetNext(n)设置的node一定是已经初始化了的
    • 同时在release的store之前的操作,在acquire的load之前都可见(就是上面说的synchronize with语义

同时,由于性能的优化,还实现了no barrier版本的

1
2
3
4
5
6
7
8
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

而且实际上insert调用的就是无屏障版本的(显然是出于性能的考虑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); // x->next = prev->next
prev[i]->SetNext(i, x); // prev->next = x
}
}

insert
其中作者也解释了为什么此处可以使用无屏障版本的(无需同步)
实际上insert可以分为两步

  1. 将新节点的next设为当前的next
  2. 将prev的next设为当前节点
    倘若发生了1,还没同步,这时候其他并发的线程读到的next还是next,不影响下一步操作