TEST_F(DBTest, Empty) { do { ASSERT_TRUE(db_ != nullptr); ASSERT_EQ("NOT_FOUND", Get("foo")); } while (ChangeOptions()); }
// Switch to a fresh database with the next option configuration to // test. Return false if there are no more configurations to test. boolChangeOptions(){ option_config_++; if (option_config_ >= kEnd) { returnfalse; } else { DestroyAndReopen(); returntrue; } }
voidDBImpl::MaybeScheduleCompaction(){ mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled } elseif (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } elseif (!bg_error_.ok()) { // Already got an error; no more changes } elseif (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done } else { background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } }
// Start the background thread, if we haven't done so already. if (!started_background_thread_) { started_background_thread_ = true; std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this); background_thread.detach(); }
// If the queue is empty, the background thread may be waiting for work. if (background_work_queue_.empty()) { background_work_cv_.Signal(); }
// Stores the work item data in a Schedule() call. // // Instances are constructed on the thread calling Schedule() and used on the // background thread. // // This structure is thread-safe beacuse it is immutable. structBackgroundWorkItem { explicitBackgroundWorkItem(void (*function)(void* arg), void* arg) : function(function), arg(arg) {}
// Start the background thread, if we haven't done so already. if (!started_background_thread_) { started_background_thread_ = true; std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this); background_thread.detach(); }
case 08,GetFromVersions
1 2 3 4 5 6 7
TEST_F(DBTest, GetFromVersions) { do { ASSERT_LEVELDB_OK(Put("foo", "v1")); dbfull()->TEST_CompactMemTable(); ASSERT_EQ("v1", Get("foo")); } while (ChangeOptions()); // 会执行DestroyAndReopen(); }
Status DBImpl::TEST_CompactMemTable(){ // nullptr batch means just wait for earlier writes to be done Status s = Write(WriteOptions(), nullptr); if (s.ok()) { // Wait until the compaction completes MutexLock l(&mutex_); while (imm_ != nullptr && bg_error_.ok()) { background_work_finished_signal_.Wait(); // 主动释放锁并进入阻塞状态 // CompactMemTable(); 执行完会调用 // background_work_finished_signal_.SignalAll(); 唤醒 } if (imm_ != nullptr) { s = bg_error_; } } return s; }
TEST_F(DBTest, GetSnapshot) { do { // Try with both a short key and a long key for (int i = 0; i < 2; i++) { std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); ASSERT_LEVELDB_OK(Put(key, "v1")); const Snapshot* s1 = db_->GetSnapshot(); ASSERT_LEVELDB_OK(Put(key, "v2")); ASSERT_EQ("v2", Get(key)); ASSERT_EQ("v1", Get(key, s1)); dbfull()->TEST_CompactMemTable(); ASSERT_EQ("v2", Get(key)); ASSERT_EQ("v1", Get(key, s1)); db_->ReleaseSnapshot(s1); } } while (ChangeOptions()); }
// Creates a SnapshotImpl and appends it to the end of the list. SnapshotImpl* New(SequenceNumber sequence_number){ assert(empty() || newest()->sequence_number_ <= sequence_number);
TEST_F(DBTest, DeletionMarkers1) { Put("foo", "v1"); ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); constint last = config::kMaxMemCompactLevel; ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation Put("a", "begin"); Put("z", "end"); dbfull()->TEST_CompactMemTable(); ASSERT_EQ(NumTableFilesAtLevel(last), 1); ASSERT_EQ(NumTableFilesAtLevel(last - 1), 1);
Delete("foo"); Put("foo", "v2"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2 ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); Slice z("z"); dbfull()->TEST_CompactRange(last - 2, nullptr, &z); // DEL eliminated, but v1 remains because we aren't compacting that level // (DEL can be eliminated because v2 hides v1). ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]"); dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr); // Merging last-1 w/ last, so we are the base level for "foo", so // DEL is removed. (as is v1). ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]"); }
classMergingIterator : public Iterator { voidSeek(const Slice& target)override{ // 每个children是一个SkipList::Iterator或者TwoLevelIterator for (int i = 0; i < n_; i++) { children_[i].Seek(target); } FindSmallest(); direction_ = kForward; }
voidNext()override{ assert(Valid());
// Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already // true for all of the non-current_ children since current_ is // the smallest child and key() == current_->key(). Otherwise, // we explicitly position the non-current_ children. if (direction_ != kForward) { for (int i = 0; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child != current_) { child->Seek(key()); if (child->Valid() && comparator_->Compare(key(), child->key()) == 0) { child->Next(); } } } direction_ = kForward; }
current_->Next(); FindSmallest();
private: // Which direction is the iterator moving? enumDirection { kForward, kReverse };
voidFindSmallest(); voidFindLargest();
// We might want to use a heap in case there are lots of children. // For now we use a simple array since we expect a very small number // of children in leveldb. const Comparator* comparator_; IteratorWrapper* children_; int n_; IteratorWrapper* current_; Direction direction_; };
classInternalKey { private: std::string rep_;
InternalKey() {} // Leave rep_ as empty to indicate it is invalid InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) { AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t)); }