leveldb前台可并发读,但只能串行写。

leveldb memtable通过无锁skiplist结构 支持读写并发

leveldb的后台线程只有一个,只处理compaction这一个任务, leveldb的minor compaction和major compaction是一起处理的

线程调度和后台compaction

线程调度的函数

util/env_posix.cc

PosixEnv::Schedule

  1. 如果background_thread 未创建,创建之
  2. background_thread 执行BackgroundThreadEntryPoint函数,即从background_work_queue_中拿任务执行;如果没有任务,执行cv.wait()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    void PosixEnv::Schedule(
    void (*background_work_function)(void* background_work_arg),
    void* background_work_arg) {
    background_work_mutex_.Lock();

    // Start the background thread, if we haven't done so already.
    if (!started_background_thread_) {
    started_background_thread_ = true;
    std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
    background_thread.detach();
    }

    // If the queue is empty, the background thread may be waiting for work.
    if (background_work_queue_.empty()) {
    background_work_cv_.Signal();
    }

    background_work_queue_.emplace(background_work_function, background_work_arg);
    background_work_mutex_.Unlock();
    }

    static void BackgroundThreadEntryPoint(PosixEnv* env) {
    env->BackgroundThreadMain();
    }

    void PosixEnv::BackgroundThreadMain() {
    while (true) {
    background_work_mutex_.Lock();

    // Wait until there is work to be done.
    while (background_work_queue_.empty()) {
    background_work_cv_.Wait();
    }

    assert(!background_work_queue_.empty());
    auto background_work_function = background_work_queue_.front().function;
    void* background_work_arg = background_work_queue_.front().arg;
    background_work_queue_.pop();

    background_work_mutex_.Unlock();
    background_work_function(background_work_arg);
    }
    }

PosixEnv::Schedule在哪里被调用

  1. DBImpl::MaybeScheduleCompaction
    background_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);

DBImpl::BGWork minor, major compaction都执行

只有compaction被调用。。

PosixEnv::StartThread
直接开启新线程执行thread_main

1
2
3
4
5
void StartThread(void (*thread_main)(void* thread_main_arg),
void* thread_main_arg) override {
std::thread new_thread(thread_main, thread_main_arg);
new_thread.detach();
}

StartThread 被测试和benchmark调用

前台线程

DBImpl的接口
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
可以被多线程执行,即前台多线程。

leveldb前台可并发读,但只能串行写。

Get 读

支持多线程读,即以下逻辑执行是可以多线程的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

不担心和写冲突吗?

  1. 如果是读tablefile,读只可能和compaction冲突。但对于compaction时的sst,读操作只可能读旧文件。即对于执行A->B,前台读只可能读到A,因此前台读不会和compaction 写冲突
  2. 如果读memtable,采用的是无锁跳表,保证读写原子性。

读skiplist key流程,即SkipList<Key, Comparator>::Iterator::Seek(const Key& target)

读和写skiplist核心都是获得FindGreaterOrEqual,读的目的是拿到key节点,写的目的是拿到key节点,然后插入它的next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}

template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}

template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}

Key const key;

// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};

写skiplist流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}

class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;

void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}
// 插入节点到prev数组元素的next
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}

Write写

DBImpl::Write,为什么释放锁?
log_->AddRecord和 WriteBatchInternal::InsertInto 实际只有一个线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();

原因在于writer
构造使用mutex_构造
3. 构造writer
4. 将writer放入writers_的尾部
5. 线程持续等待,知道当前writer是writers_的队首

这保证只有一个线程把writer处理完,从writers_出队,后面的线程才能处理writer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}

// writer构造, mutex用于writer.wait()
struct DBImpl::Writer {
explicit Writer(port::Mutex* mu)
: batch(nullptr), sync(false), done(false), cv(mu) {}

Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};

并发benchmark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void WriteRandom(ThreadState* thread) { DoWrite(thread, false); }

void DoWrite(ThreadState* thread, bool seq) {
if (num_ != FLAGS_num) {
char msg[100];
std::snprintf(msg, sizeof(msg), "(%d ops)", num_);
thread->stats.AddMessage(msg);
}

RandomGenerator gen;
WriteBatch batch;
Status s;
int64_t bytes = 0;
KeyBuffer key;
for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);
if (!s.ok()) {
std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
std::exit(1);
}
}
thread->stats.AddBytes(bytes);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void ReadRandom(ThreadState* thread) {
ReadOptions options;
std::string value;
int found = 0;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
if (db_->Get(options, key.slice(), &value).ok()) {
found++;
}
thread->stats.FinishedSingleOp();
}
char msg[100];
std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
thread->stats.AddMessage(msg);
}

多线程执行
线程数量为n

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void RunBenchmark(int n, Slice name,
void (Benchmark::*method)(ThreadState*)) {
SharedState shared(n);

ThreadArg* arg = new ThreadArg[n];
for (int i = 0; i < n; i++) {
arg[i].bm = this;
arg[i].method = method;
arg[i].shared = &shared;
++total_thread_count_;
// Seed the thread's random state deterministically based upon thread
// creation across all benchmarks. This ensures that the seeds are unique
// but reproducible when rerunning the same set of benchmarks.
arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_);
arg[i].thread->shared = &shared;
g_env->StartThread(ThreadBody, &arg[i]);
}