// Start the background thread, if we haven't done so already. if (!started_background_thread_) { started_background_thread_ = true; std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this); background_thread.detach(); }
// If the queue is empty, the background thread may be waiting for work. if (background_work_queue_.empty()) { background_work_cv_.Signal(); }
// Accessors/mutators for links. Wrapped in methods so we can // add the appropriate barriers as necessary. Node* Next(int n){ assert(n >= 0); // Use an 'acquire load' so that we observe a fully initialized // version of the returned Node. return next_[n].load(std::memory_order_acquire); } voidSetNext(int n, Node* x){ assert(n >= 0); // Use a 'release store' so that anybody who reads through this // pointer observes a fully initialized version of the inserted node. next_[n].store(x, std::memory_order_release); } private: // Array of length equal to the node height. next_[0] is lowest level link. std::atomic<Node*> next_[1]; };
voidMemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value){ // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; constsize_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; char* buf = arena_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); std::memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); std::memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); table_.Insert(buf); }
template <typename Key, classComparator> void SkipList<Key, Comparator>::Insert(const Key& key) { // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // here since Insert() is externally synchronized. Node* prev[kMaxHeight]; Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight(); if (height > GetMaxHeight()) { for (int i = GetMaxHeight(); i < height; i++) { prev[i] = head_; } // It is ok to mutate max_height_ without any synchronization // with concurrent readers. A concurrent reader that observes // the new value of max_height_ will see either the old value of // new level pointers from head_ (nullptr), or a new value set in // the loop below. In the former case the reader will // immediately drop to the next level since nullptr sorts after all // keys. In the latter case the reader will use the new node. max_height_.store(height, std::memory_order_relaxed); } // 插入节点到prev数组元素的next x = NewNode(key, height); for (int i = 0; i < height; i++) { // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); prev[i]->SetNext(i, x); } }
RandomGenerator gen; WriteBatch batch; Status s; int64_t bytes = 0; KeyBuffer key; for (int i = 0; i < num_; i += entries_per_batch_) { batch.Clear(); for (int j = 0; j < entries_per_batch_; j++) { constint k = seq ? i + j : thread->rand.Uniform(FLAGS_num); key.Set(k); batch.Put(key.slice(), gen.Generate(value_size_)); bytes += value_size_ + key.slice().size(); thread->stats.FinishedSingleOp(); } s = db_->Write(write_options_, &batch); if (!s.ok()) { std::fprintf(stderr, "put error: %s\n", s.ToString().c_str()); std::exit(1); } } thread->stats.AddBytes(bytes); }
读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
voidReadRandom(ThreadState* thread){ ReadOptions options; std::string value; int found = 0; KeyBuffer key; for (int i = 0; i < reads_; i++) { constint k = thread->rand.Uniform(FLAGS_num); key.Set(k); if (db_->Get(options, key.slice(), &value).ok()) { found++; } thread->stats.FinishedSingleOp(); } char msg[100]; std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); thread->stats.AddMessage(msg); }
多线程执行 线程数量为n
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
voidRunBenchmark(int n, Slice name, void (Benchmark::*method)(ThreadState*)){ SharedState shared(n);
ThreadArg* arg = new ThreadArg[n]; for (int i = 0; i < n; i++) { arg[i].bm = this; arg[i].method = method; arg[i].shared = &shared; ++total_thread_count_; // Seed the thread's random state deterministically based upon thread // creation across all benchmarks. This ensures that the seeds are unique // but reproducible when rerunning the same set of benchmarks. arg[i].thread = newThreadState(i, /*seed=*/1000 + total_thread_count_); arg[i].thread->shared = &shared; g_env->StartThread(ThreadBody, &arg[i]); }