leveldb 可分为五大块。db, iterator,version, memtable+log, tablefile。

leveldb 的架构图资料上有很多,可以参考网站

db

主要接口Open,Put/Write,Delete,Get

Open

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr)
Open 需要传入options,options记录了1. 限制参数,例如max_open_files,max_file_size,block_size 2. Env* env; env提供创建文件等接口

open的执行过程

  1. 利用manifest recover;impl->Recover(&edit, &save_manifest);
    1. 包含两部分1. versions_->Recover(save_manifest); 2. RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence);
    2. 在dbdir 里找到logfile,根据文件名的序号排序,将排序后的logfile按序执行RecoverLogFile
  2. version和logfile replay后将状态记录给versionEdit,versionEdit的内容主要包括log_number_、last_sequence_、next_file_number_和new_files_(tablefile)。
  3. 最后执行VersionSet::LogAndApply,将VersionEdit SaveTo Version,然后把version append到versionSet

Write

Write的执行过程

  1. MakeRoomForWrite(updates == nullptr); 如果mem空间不够,将mem设置成imm,新建mem,触发dump
  2. WriteBatch* write_batch = BuildBatchGroup(&last_writer); 将writer放入到WriteBatchInternal::Append
  3. log_->AddRecord(WriteBatchInternal::Contents(write_batch));
  4. WriteBatchInternal::InsertInto(write_batch, mem_);
  5. DB::Write可以多线程执行,使用生产者消费者作为线程切换。线程同时可以是生产者,也可以是消费者。线程申请锁,把write放入队列,然后检查writer队列,如果有writer则执行具体write

Get

Get的执行过程

  1. 如果没有传入snapshot,设置snapshot为snapshot = versions_->LastSequence();
  2. LookupKey lkey(key, snapshot); lookup key包含了seq
  3. mem->Get(lkey, value, &s)
  4. imm->Get(lkey, value, &s)
  5. current->Get(options, lkey, value, &stats) (即Version::Get)
    1. 对L0的文件按照filenumber从大到小排序,遍历检索(L0 tablefile内部key有序,文件之间key无序, 文件有number顺序)
    2. 对L1以下的文件,每层使用二分查找找到指定的tablefile
    3. 对tablefile内部,首先使用TableCache::Get 尝试在cache中查找。cache是tablefile共享的,key是filenumber。如果tablefile没有cache,则读取文件插入cache。注意到tablecache只是pagecache上面的一层,tablecache 会cache住tablefile的metaindex_handle、index_block等元数据信息
    4. tablefile内的检索过程 1. rep_->index_block->NewIterator 迭代器seek index_block 2. Iterator* block_iter = BlockReader(this, options, iiter->value()); seek block

skiplist的元素顺序

  1. 元素格式| key_size | key | seq,type | value_size | value
  2. 从key部分之后开始排序
  3. 对于key相同的,包括delete标签,按照seq从大到小排序
  4. mem->Get 主要调用的是SkipList<Key, Comparator>::FindGreaterOrEqual,默认get key的seq是最近的seq,FindGreaterOrEqual会直接得到第一个key(也就是seq最大的key)。设置快照后get key会得到<=某seq的key。

Delete 和Write实现一致,key会有个标签标志该key是delete。

db检索元素使用的是迭代器的seek,例如
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target)

迭代器

迭代器基本接口

  1. 检索void Seek(const Key& target);
  2. 获得前后元素void Next(); void Prev();
  3. 获得指向的节点
  4. 插入元素void Insert(const Key& key);

主要迭代器
SkipList::Iterator 用于操作mem和Imm table

TwoLevelIterator 用于检索tablefile,一次性需要把一个tablefile完全加载到内存

Block::Iter,操作tablefile的一个datablock,用于检索。1. 检索到RestartPoint 2. 从RestartPoint进一步检索得到key

MergingIterator 用于compaction。管理一层的若干tablefile,用来合并成为新的tablefile。采用多路败者树进行多个有序文件合并

只有skiplist需要插入修改,tablefile的修改只有append写。

version

versionEdit对应manifest的一条记录

version只记录了tablefile的分布,不记录redolog和skiplist部分。redolog和skiplist 使用seq记录版本。
seq+version构成了快照,一个快照在tablefile中对应一个版本,被快照引用的tablefile不会被回收(即使被compaction了)

恢复版本就是选择版本包含的tablefile进行管理;恢复快照还需要把redolog 全量恢复,其中log_number用来恢复memtable,prev_log_number用来恢复immutable。切换logfile时会向manifest写入新的log_number。

memtable+log

memtable就是skiplist,节点格式

| key_size | key | seq,type | value_size | value

log的记录是writebatch序列化的结果,对于write操作,格式

| kTypeValue, seq | key_size | key | value_size | value | …

对于delete操作,格式

| kTypeDeletion,seq | key_size | key |

tablefile

tablefile使用TableBuilder构建

tablefile没有header
格式

  1. data block
  2. Write filter block
  3. Write metaindex block
  4. Write index block
  5. Write footer

indexblock 每个元素是datablock的最后一个key

metaindex block记录除datablock, indexblock之外的block的位置,例如filter block(布隆过滤器)

footer
| metaindex_handle_ | index_handle_ | kTableMagicNumber |

metaindex_handle_,index_handle_ 记录了metaindex block和index block的位置

1
2
3
4
5
6
7
8
9
10
void Footer::EncodeTo(std::string* dst) const {
const size_t original_size = dst->size();
metaindex_handle_.EncodeTo(dst);
index_handle_.EncodeTo(dst);
dst->resize(2 * BlockHandle::kMaxEncodedLength); // Padding
PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber & 0xffffffffu));
PutFixed32(dst, static_cast<uint32_t>(kTableMagicNumber >> 32));
assert(dst->size() == original_size + kEncodedLength);
(void)original_size; // Disable unused variable warning.
}

借助单元测试分析源码

unittest是分析代码的有效入口,以db_test为例

case1 Empty

DBTest, Empty,尝试以不同option配置打开db
可选择的配置,{ kDefault, kReuse, kFilter, kUncompressed, kEnd }

执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
TEST_F(DBTest, Empty) {
do {
ASSERT_TRUE(db_ != nullptr);
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}

// Switch to a fresh database with the next option configuration to
// test. Return false if there are no more configurations to test.
bool ChangeOptions() {
option_config_++;
if (option_config_ >= kEnd) {
return false;
} else {
DestroyAndReopen();
return true;
}
}

  enum OptionConfig { kDefault, kReuse, kFilter, kUncompressed, kEnd };

void DestroyAndReopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
DestroyDB(dbname_, Options());
ASSERT_LEVELDB_OK(TryReopen(options));
}

// Status TryReopen(Options* options) 需要传入Option
Status TryReopen(Options* options) {
delete db_;
db_ = nullptr;
Options opts;
if (options != nullptr) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
last_options_ = opts;

return DB::Open(opts, dbname_, &db_);
}

cmake debug模式编译
cmake -DCMAKE_BUILD_TYPE=Debug ..

case02,03 EmptyKey, EmptyValue

不同option打开db,put key同时可以get到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
TEST_F(DBTest, EmptyKey) {
do {
ASSERT_LEVELDB_OK(Put("", "v1"));
ASSERT_EQ("v1", Get(""));
ASSERT_LEVELDB_OK(Put("", "v2"));
ASSERT_EQ("v2", Get(""));
} while (ChangeOptions());
}

TEST_F(DBTest, EmptyValue) {
do {
ASSERT_LEVELDB_OK(Put("key", "v1"));
ASSERT_EQ("v1", Get("key"));
ASSERT_LEVELDB_OK(Put("key", ""));
ASSERT_EQ("", Get("key"));
ASSERT_LEVELDB_OK(Put("key", "v2"));
ASSERT_EQ("v2", Get("key"));
} while (ChangeOptions());
}

case04 ReadWrite

读写

1
2
3
4
5
6
7
8
9
10
TEST_F(DBTest, ReadWrite) {
do {
ASSERT_LEVELDB_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_LEVELDB_OK(Put("bar", "v2"));
ASSERT_LEVELDB_OK(Put("foo", "v3"));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
} while (ChangeOptions());
}

case05 PutDeleteGet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TEST_F(DBTest, PutDeleteGet) {
do {
ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
ASSERT_LEVELDB_OK(db_->Delete(WriteOptions(), "foo"));
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}

#define EXPECT_LEVELDB_OK(expression) \
EXPECT_THAT(expression, leveldb::test::IsOK())
#define ASSERT_LEVELDB_OK(expression) \
ASSERT_THAT(expression, leveldb::test::IsOK())

case06 GetFromImmutableLayer

核心是三行

env_->delay_data_sync_.store(true, std::memory_order_release);
Put(“k1”, std::string(100000, ‘x’)); // Fill memtable.
Put(“k2”, std::string(100000, ‘y’)); // Trigger compaction.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
TEST_F(DBTest, GetFromImmutableLayer) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);

ASSERT_LEVELDB_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));

// Block sync calls.
env_->delay_data_sync_.store(true, std::memory_order_release);
Put("k1", std::string(100000, 'x')); // Fill memtable.
Put("k2", std::string(100000, 'y')); // Trigger compaction.
ASSERT_EQ("v1", Get("foo"));
// Release sync calls.
env_->delay_data_sync_.store(false, std::memory_order_release);
} while (ChangeOptions());
}

put key2时,1. 将mem变成imm 2. 后台触发compaction

如果delay_data_sync_ 设置true,后台compaction线程阻塞在L0->L1的DBImpl::WriteLevel0Table。imm_尚未被删除,foo还是会在imm_中读取

write_buffer_size 作用
DBImpl::MakeRoomForWrite 发现mem_->ApproximateMemoryUsage() > options_.write_buffer_size),认为没有空间,然后执行

  1. s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
  2. 设置Imm,imm_ = mem_;
    has_imm_.store(true, std::memory_order_release);
    mem_ = new MemTable(internal_comparator_);
  3. MaybeScheduleCompaction();

delay_data_sync_ 的作用,从取名上作用是延迟sync。其实就是datafile的sync()函数一直阻塞。

什么时候执行Sync()

  1. recover时,如果db不存在,执行DBImpl::NewDB()。执行manifest file->Sync();
  2.  DBImpl::FinishCompactionOutputFile 执行compact->outfile->Sync();
    1. FinishCompactionOutputFile函数在minor compaction和major compaction执行完之后都会执行
  3. DBImpl::Write 执行logfile_->Sync();
  4. DBImpl::WriteLevel0Table 执行BuildTable,最后执行s = file->Sync();

如果delay_data_sync_ 设置true,会卡在L0->L1的DBImpl::WriteLevel0Table。imm_尚未被删除,数据还是会在imm_中读取

MaybeScheduleCompaction 函数逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}

PosixEnv::Schedule
作为生产者

  1. 如果background_work_queue_为空,唤醒background_work_cv_(消费线程会重新申请锁
  2. 将任务加入到background_work_queue_,线程保护
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    void PosixEnv::Schedule(
    void (*background_work_function)(void* background_work_arg),
    void* background_work_arg) {
    background_work_mutex_.Lock();

    // Start the background thread, if we haven't done so already.
    if (!started_background_thread_) {
    started_background_thread_ = true;
    std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
    background_thread.detach();
    }

    // If the queue is empty, the background thread may be waiting for work.
    if (background_work_queue_.empty()) {
    background_work_cv_.Signal();
    }

    background_work_queue_.emplace(background_work_function, background_work_arg);
    background_work_mutex_.Unlock();
    }

    port::Mutex background_work_mutex_;
    port::CondVar background_work_cv_ GUARDED_BY(background_work_mutex_);
    bool started_background_thread_ GUARDED_BY(background_work_mutex_);
    std::queue<BackgroundWorkItem> background_work_queue_
    GUARDED_BY(background_work_mutex_);
    GUARDED_BY 是一种在多线程编程中用于静态代码分析的注解(或宏),其核心作用是声明某个共享变量必须通过特定的锁(或其他同步机制)来保护,以避免数据竞争和并发安全问题。

BackgroundWorkItem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  // Stores the work item data in a Schedule() call.
//
// Instances are constructed on the thread calling Schedule() and used on the
// background thread.
//
// This structure is thread-safe beacuse it is immutable.
struct BackgroundWorkItem {
explicit BackgroundWorkItem(void (*function)(void* arg), void* arg)
: function(function), arg(arg) {}

void (*const function)(void*);
void* const arg;
};

void PosixEnv::BackgroundThreadMain() {
while (true) {
background_work_mutex_.Lock();

// Wait until there is work to be done.
while (background_work_queue_.empty()) {
background_work_cv_.Wait();
}

assert(!background_work_queue_.empty());
auto background_work_function = background_work_queue_.front().function;
void* background_work_arg = background_work_queue_.front().arg;
background_work_queue_.pop();

background_work_mutex_.Unlock();
background_work_function(background_work_arg);
}
}

启动,可以看到只有一个消费者线程

1
2
3
4
5
6
7
8
9
10
11
void PosixEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_work_mutex_.Lock();

// Start the background thread, if we haven't done so already.
if (!started_background_thread_) {
started_background_thread_ = true;
std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
background_thread.detach();
}

case 08,GetFromVersions

1
2
3
4
5
6
7
TEST_F(DBTest, GetFromVersions) {
do {
ASSERT_LEVELDB_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v1", Get("foo"));
} while (ChangeOptions()); // 会执行DestroyAndReopen();
}

dbfull()

1
  DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }

DBImpl::TEST_CompactMemTable()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status DBImpl::TEST_CompactMemTable() {
// nullptr batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), nullptr);
if (s.ok()) {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != nullptr && bg_error_.ok()) {
background_work_finished_signal_.Wait(); // 主动释放锁并进入阻塞状态
// CompactMemTable(); 执行完会调用
// background_work_finished_signal_.SignalAll(); 唤醒
}
if (imm_ != nullptr) {
s = bg_error_;
}
}
return s;
}

GetSnapshot

db_->GetSnapshot();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
TEST_F(DBTest, GetSnapshot) {
do {
// Try with both a short key and a long key
for (int i = 0; i < 2; i++) {
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
ASSERT_LEVELDB_OK(Put(key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_LEVELDB_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
db_->ReleaseSnapshot(s1);
}
} while (ChangeOptions());
}

const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}

// Creates a SnapshotImpl and appends it to the end of the list.
SnapshotImpl* New(SequenceNumber sequence_number) {
assert(empty() || newest()->sequence_number_ <= sequence_number);

SnapshotImpl* snapshot = new SnapshotImpl(sequence_number);

snapshot->next_ = &head_;
snapshot->prev_ = head_.prev_;
snapshot->prev_->next_ = snapshot;
snapshot->next_->prev_ = snapshot;
return snapshot;
}

DeletionMarkers1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
TEST_F(DBTest, DeletionMarkers1) {
Put("foo", "v1");
ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level

// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last - 1), 1);

Delete("foo");
Put("foo", "v2");
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
Slice z("z");
dbfull()->TEST_CompactRange(last - 2, nullptr, &z);
// DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
}

Delete的实现

1
2
3
4
5
6
7
8
9
10
11
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);
return Write(opt, &batch);
}

void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}

AllEntriesFor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
std::string AllEntriesFor(const Slice& user_key) {
Iterator* iter = dbfull()->TEST_NewInternalIterator();
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
//
iter->Seek(target.Encode());
std::string result;
if (!iter->status().ok()) {
result = iter->status().ToString();
} else {
result = "[ ";
bool first = true;
while (iter->Valid()) {
ParsedInternalKey ikey;
if (!ParseInternalKey(iter->key(), &ikey)) {
result += "CORRUPTED";
} else {
if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) {
break;
}
if (!first) {
result += ", ";
}
first = false;
switch (ikey.type) {
case kTypeValue:
result += iter->value().ToString();
break;
case kTypeDeletion:
result += "DEL";
break;
}
}
iter->Next();
}
if (!first) {
result += " ";
}
result += "]";
}
delete iter;
return result;
}

Iterator* iter = dbfull()->TEST_NewInternalIterator();

InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
//
iter->Seek(target.Encode());

这里的target 封装了seq和type

Iterator::Seek

SSTable 文件由数据块和索引块组成。索引块存储了数据块的元信息,比如每个数据块的起始键和位置。当需要查找某个键时,首先通过索引块找到对应的数据块,然后在数据块内进行查找。TwoLevelIterator负责这种两级查找过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}
// 负责sst file的遍历
void TwoLevelIterator::Seek(const Slice& target) {
// seek索引块
index_iter_.Seek(target);
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.Seek(target);
SkipEmptyDataBlocksForward();
}

class MergingIterator : public Iterator {
void Seek(const Slice& target) override {
// 每个children是一个SkipList::Iterator或者TwoLevelIterator
for (int i = 0; i < n_; i++) {
children_[i].Seek(target);
}
FindSmallest();
direction_ = kForward;
}

void Next() override {
assert(Valid());

// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() &&
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
}
}
direction_ = kForward;
}

current_->Next();
FindSmallest();

private:
// Which direction is the iterator moving?
enum Direction { kForward, kReverse };

void FindSmallest();
void FindLargest();

// We might want to use a heap in case there are lots of children.
// For now we use a simple array since we expect a very small number
// of children in leveldb.
const Comparator* comparator_;
IteratorWrapper* children_;
int n_;
IteratorWrapper* current_;
Direction direction_;
};

class InternalKey {
private:
std::string rep_;

InternalKey() {} // Leave rep_ as empty to indicate it is invalid
InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));
}

TODO

迭代器具体使用

详细线程模型和并发控制

模块依赖和编译依赖

性能分析