DBIter·
Leveldb数据库的MemTable和sstable文件的存储格式都是InternalKey(userkey, seq, type) => uservalue。
DBIter把同一个userkey在DB中的多条记录合并为一条,综合考虑了userkey的序号、删除标记、和写覆盖等等因素 。DBIter只会把userkey最新(seq最大的就是最新的,相同userkey的老记录(seq较小的)不会让上层看到)的一条记录展现给用户,另外如果这条最新的记录是删除类型,则会跳过该记录,否则,遍历时会把已删除的key列举出来。
1. 创建DBIterator·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Iterator* DBImpl::NewIterator (const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; Iterator* iter = NewInternalIterator (options, &latest_snapshot, &seed); return NewDBIterator (this , user_comparator (), iter, (options.snapshot != nullptr ? static_cast <const SnapshotImpl*>(options.snapshot) ->sequence_number () : latest_snapshot), seed); } Iterator* NewDBIterator (DBImpl* db, const Comparator* user_key_comparator, Iterator* internal_iter, SequenceNumber sequence, uint32_t seed) { return new DBIter (db, user_key_comparator, internal_iter, sequence, seed); }
2. DBIterator的基本定义·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class DBIter : public Iterator { enum Direction { kForward, kReverse }; private : DBImpl* db_; const Comparator* const user_comparator_; Iterator* const iter_; SequenceNumber const sequence_; Status status_; std::string saved_key_; std::string saved_value_; Direction direction_; bool valid_; Random rnd_; ssize_t bytes_counter_; }
后面再说DBIter的作用,这里先介绍下sequence_的作用。
值得一提的sequence_·
在创建DBIter时,我们将系统的一个snapshot的sequence_number传递给了DBIter:
1 2 3 4 5 6 return NewDBIterator (this , user_comparator (), iter, (options.snapshot != nullptr ? static_cast <const SnapshotImpl*>(options.snapshot) ->sequence_number () : latest_snapshot), seed);
那它代表什么含义?
我们从一次WriteBatch出发,看看序列号是怎么使用及修改的。先给出结论图:
**我们知道一个WriteBatch对一次批量写操作的封装。**先看看一个WriteBatch的结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class LEVELDB_EXPORT WriteBatch { public : class LEVELDB_EXPORT Handler { public : virtual ~Handler (); virtual void Put (const Slice& key, const Slice& value) = 0 ; virtual void Delete (const Slice& key) = 0 ; }; ... WriteBatch (); private : friend class WriteBatchInternal ; std::string rep_; };
其内部只有一个rep_的成员变量,其结构如下:
由8字节序列号,4字节count,和实际数据record数组组成。
一次Put操作:
1 2 3 4 5 6 void WriteBatch::Put (const Slice& key, const Slice& value) { WriteBatchInternal::SetCount (this , WriteBatchInternal::Count (this ) + 1 ); rep_.push_back (static_cast <char >(kTypeValue)); PutLengthPrefixedSlice (&rep_, key); PutLengthPrefixedSlice (&rep_, value); }
Put操作很简单,设置count+1, 然后将key value append 到rep_末尾。
这里我们知道了count代表rep_中有效的kv pair数量。
有了batch,我们如何应用batch(即如何将batch写入到leveldb系统中?)。对于一次写,除了写入到Log中来保证持久性以外,我们首先做的就是将数据插入到memtable中。这个过程如下函数:
注意,这里终于要说到seequence_的意义了
1 2 3 4 5 Status DBImpl::Write (const WriteOptions& options, WriteBatch* updates) { ... status = WriteBatchInternal::InsertInto (write_batch, mem_); ... }
1 2 3 4 5 6 7 8 9 10 Status WriteBatchInternal::InsertInto (const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence (b); inserter.mem_ = memtable; return b->Iterate (&inserter); } SequenceNumber WriteBatchInternal::Sequence (const WriteBatch* b) { return SequenceNumber (DecodeFixed64 (b->rep_.data ())); }
现在在MemTableInserter中保存了一个 batch最开始的sequence number。(至于batch的sequence number是在哪儿初始化的,我们之后再说)。
再回到问题,如何将batch中的多个kv pair应用到系统中?核心就在b->Iterate(&inserter)中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Status WriteBatch::Iterate (Handler* handler) const { ... while (!input.empty ()) { ... switch (tag) { case kTypeValue: if (GetLengthPrefixedSlice (&input, &key) && GetLengthPrefixedSlice (&input, &value)) { handler->Put (key, value); } else { return Status::Corruption ("bad WriteBatch Put" ); } break ; case kTypeDeletion: if (GetLengthPrefixedSlice (&input, &key)) { handler->Delete (key); } else { return Status::Corruption ("bad WriteBatch Delete" ); } break ; default : return Status::Corruption ("unknown WriteBatch tag" ); } } if (found != WriteBatchInternal::Count (this )) { return Status::Corruption ("WriteBatch has wrong count" ); } else { return Status::OK (); } }
注意到这就是一个循环遍历batch中的各kv pair。然后调用handler->Put或handler->Delete来应用。而handler就是MemTableInserter:
1 2 3 4 5 6 7 8 void Put (const Slice& key, const Slice& value) override { mem_->Add (sequence_, kTypeValue, key, value); sequence_++; } void Delete (const Slice& key) override { mem_->Add (sequence_, kTypeDeletion, key, Slice ()); sequence_++; }
从这里终于可以看出,每插入一对kv,就会递增一次sequence
现在还剩最后一个问题,batch的sequence number是如何初始化的?
在WriteBatchInternal类中有个SetSequence函数:
1 2 3 void WriteBatchInternal::SetSequence (WriteBatch* b, SequenceNumber seq) { EncodeFixed64 (&b->rep_[0 ], seq); }
在这里,将rep_的前8个字节设置为这个batch的起始sequence。那谁调用的这个函数?
在DBImpl::Wirte函数中:
1 2 3 4 5 6 7 8 9 Status DBImpl::Write (const WriteOptions& options, WriteBatch* updates) { ... Status status = MakeRoomForWrite (updates == nullptr ); uint64_t last_sequence = versions_->LastSequence (); Writer* last_writer = &w; if (status.ok () && updates != nullptr ) { WriteBatch* write_batch = BuildBatchGroup (&last_writer); WriteBatchInternal::SetSequence (write_batch, last_sequence + 1 );
当成这次batch的写入后,又会反过来更新version中的序列号:
1 2 3 4 5 6 last_sequence += WriteBatchInternal::Count (write_batch); ... if (write_batch == tmp_batch_) tmp_batch_->Clear ();versions_->SetLastSequence (last_sequence);
所以,究其根本,系统中插入的每对kv的序列号,最初是由当前系统version的序列号指定的,然后在后序插入的kv中逐渐递增。
有了上面对sequence的理解,我们就能更好的理解DBIter的作用了。
DBIter的作用(功能)·
iter_是由NewInternalIterator创建的一个MergingIterator ,通过MergingIterator可以实现多个有序数据集合的归并操作。其中包含多个child iterator组成的集合。对MergingIterator的遍历会有序的遍历其child iterator中的每个元素。因为iter_遍历的是数据库的每一条记录。它是以InternalKey(userkey, seq, type)为遍历粒度的,只要InternalKey中任意一个组成元素不同,MergingIterator就认为他们是不同的kv对。
而**DBIter是以userkey为遍历粒度的,只要记录的userkey相同,那么DBIter就认为他们是一条记录(不同版本),sqe越大代表该记录越新。每次迭代将跳到下一个不同userkey的记录,且DBIter在遍历一个InternalKey时仅会检索InternalKey->seq 小于 DBIter创建时所初始化的seq号。 **举个例子:
上面表示了6个InternalKey,冒号前为user_key,冒号后为序列号。现假设创建DBIter时,所初始化的seq为2. 则DBIter在从前往后遍历时,将会直接跳过 key1:6,key1:5,key1:4 和key2:3. 只会从key2:2开始遍历。
下面我们来分析DBIter的各个操作。
3.FindNextUserEntry·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void DBIter::FindNextUserEntry (bool skipping, std::string* skip) { assert (iter_->Valid ()); assert (direction_ == kForward); do { ParsedInternalKey ikey; if (ParseKey (&ikey) && ikey.sequence <= sequence_) { switch (ikey.type) { case kTypeDeletion: SaveKey (ikey.user_key, skip); skipping = true ; break ; case kTypeValue: if (skipping && user_comparator_->Compare (ikey.user_key, *skip) <= 0 ) { } else { valid_ = true ; saved_key_.clear (); return ; } break ; } } iter_->Next (); } while (iter_->Valid ()); saved_key_.clear (); valid_ = false ; }
用一个例子来解释这个函数的作用:
上图中, 每个节点由3个字段组成,由冒号:分隔,第1个为user_key,第2个为版本号,第3个为节点类型(1代表普通值,0代表删除)。
假设当前要遍历所有key,且iter_目前指向 key1:6:1. 则:
iter往下走到key1:5:1, 由于key1::5:1的user_key = key1:6:1的user_key即:
1 2 3 4 if (skipping && user_comparator_->Compare (ikey.user_key, *skip) <= 0 ) { }
所以跳过这个节点。
iter往下走到key1:4:1, 由于key1::4:1的user_key = key1:6:1的user_key,所以跳过。
iter往下走到key2:3:0, 由于节点类型是0,删除节点。所以:
1 2 3 4 5 6 case kTypeDeletion: SaveKey (ikey.user_key, skip); skipping = true ; break ;
注意之类设置了skipping
iter往下走到key2:2:1,执行到:
1 2 3 4 if (skipping && user_comparator_->Compare (ikey.user_key, *skip) <= 0 ) { }
所以跳过。
后面同理。
这次遍历的结果是,只得到了key:1:6:1(注意这是所有key1的节点中最新的节点),所有key2都跳过了。
4.Seek·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void DBIter::Seek (const Slice& target) { direction_ = kForward; ClearSavedValue (); saved_key_.clear (); AppendInternalKey (&saved_key_, ParsedInternalKey (target, sequence_, kValueTypeForSeek)); iter_->Seek (saved_key_); if (iter_->Valid ()) { FindNextUserEntry (false , &saved_key_ ); } else { valid_ = false ; } }
5. Next·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void DBIter::Next () { ... SaveKey (ExtractUserKey (iter_->key ()), &saved_key_); iter_->Next (); if (!iter_->Valid ()) { valid_ = false ; saved_key_.clear (); return ; } FindNextUserEntry (true , &saved_key_); }
还是用例子来说:
假设当前iter_ 指向 key0:8:1。则Next函数的工作如下:
在saved_key_中保存 key0:8:1
iter指向key0:7:1
执行FindNextUserEntry
由于指定了FindNextUserEntry中的skipping为true,并且key0:7:1的user key = key0:8:1的user key,所以直接跳过key0:7:1。
现在iter来到了key1:6:1,有效,直接返回。
6. FindPrevUserEntry·
向前遍历和向后遍历基本类似,但是由于是越靠前越新,所以向前遍历时,需要多往前尝试,直到找到一个新的不同的user_key.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void DBIter::FindPrevUserEntry () { assert (direction_ == kReverse); ValueType value_type = kTypeDeletion; if (iter_->Valid ()) { do { ParsedInternalKey ikey; if (ParseKey (&ikey) && ikey.sequence <= sequence_) { if ((value_type != kTypeDeletion) && user_comparator_->Compare (ikey.user_key, saved_key_) < 0 ) { break ; } value_type = ikey.type; if (value_type == kTypeDeletion) { saved_key_.clear (); ClearSavedValue (); } else { Slice raw_value = iter_->value (); if (saved_value_.capacity () > raw_value.size () + 1048576 ) { std::string empty; swap (empty, saved_value_); } SaveKey (ExtractUserKey (iter_->key ()), &saved_key_); saved_value_.assign (raw_value.data (), raw_value.size ()); } } iter_->Prev (); } while (iter_->Valid ()); } if (value_type == kTypeDeletion) { valid_ = false ; saved_key_.clear (); ClearSavedValue (); direction_ = kForward; } else { valid_ = true ; } }
假设当前iter指向 key1:4:1, 则FindPrevUserEntry的工作为:
保存key1:4:1,
1 2 3 4 5 6 7 8 Slice raw_value = iter_->value (); if (saved_value_.capacity () > raw_value.size () + 1048576 ) { std::string empty; swap (empty, saved_value_); } SaveKey (ExtractUserKey (iter_->key ()), &saved_key_); saved_value_.assign (raw_value.data (), raw_value.size ()); }
iter指向key1:5:0,当前是删除节点,清空saved_key和saved_value:
1 2 3 4 if (value_type == kTypeDeletion) { saved_key_.clear (); ClearSavedValue (); }
iter指向key1:6:1, 保存.
iter指向key0:7:1, 遇到不同user_key且不是删除几点,跳出:
1 2 3 4 5 if ((value_type != kTypeDeletion) && user_comparator_->Compare (ikey.user_key, saved_key_) < 0 ) { break ; }
这样就从后往前遍历到当前user_key的最新版本。
7. Prev·
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void DBIter::Prev () { assert (valid_); if (direction_ == kForward) { assert (iter_->Valid ()); SaveKey (ExtractUserKey (iter_->key ()), &saved_key_); while (true ) { iter_->Prev (); if (!iter_->Valid ()) { valid_ = false ; saved_key_.clear (); ClearSavedValue (); return ; } if (user_comparator_->Compare (ExtractUserKey (iter_->key ()), saved_key_) < 0 ) { break ; } } direction_ = kReverse; } FindPrevUserEntry (); }
注释说得很明白,首先从后往前遍历到和当前user_key不同的节点,设这个节点为节点k。然后通过FindPrevUserEntry找到节点k的最新版本。
8. key & value·
1 2 3 4 5 6 7 8 Slice key () const override { assert (valid_); return (direction_ == kForward) ? ExtractUserKey (iter_->key ()) : saved_key_; } Slice value () const override { assert (valid_); return (direction_ == kForward) ? iter_->value () : saved_value_; }
这个很简单了,不过这里也终于解释了saved_key_,saved_value_已经direction变量的作用。因为在反向遍历时,会出现这种的情况:
反向遍历时,需要的key应该时key1:6:1, 但是iter_已经指向了key0:7:1。 正向遍历则不存在这个问题,所以DBIter使用了三个额外的变量(direction,saved_key,saved_value)来区分正向遍历和反向遍历。
9. ParseKey·
看完前面的几个函数分析,其实已经能够了解DBIter的所有操作了,但是这里还要提一下ParseKey这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 inline bool DBIter::ParseKey (ParsedInternalKey* ikey) { Slice k = iter_->key (); size_t bytes_read = k.size () + iter_->value ().size (); while (bytes_until_read_sampling_ < bytes_read) { bytes_until_read_sampling_ += RandomCompactionPeriod (); db_->RecordReadSample (k); } assert (bytes_until_read_sampling_ >= bytes_read); bytes_until_read_sampling_ -= bytes_read; if (!ParseInternalKey (k, ikey)) { status_ = Status::Corruption ("corrupted internal key in DBIter" ); return false ; } else { return true ; } }
ParseKey的主要职责从iter_->key()返回的string中解析出一个InternalKey出来,封装在ikey中。这部分很简单,我想说的是:
1 2 3 4 5 6 7 size_t bytes_read = k.size () + iter_->value ().size ();while (bytes_until_read_sampling_ < bytes_read) { bytes_until_read_sampling_ += RandomCompactionPeriod (); db_->RecordReadSample (k); } assert (bytes_until_read_sampling_ >= bytes_read);bytes_until_read_sampling_ -= bytes_read;
这部分代码的意义是在 DBIter的scan过程中,对整个sstable进行compaction。当然这个compaction肯定有个执行间隔,这个间隔就是由bytes_until_read_sampling_来控制。它由一个随机数初始化:
1 2 3 4 5 6 7 static const int kReadBytesPeriod = 1048576 ; size_t RandomCompactionPeriod () { return rnd_.Uniform (2 * config::kReadBytesPeriod); }
每次增加的间隔也是也是一个基于均匀分布的随机数。
再看看db_->RecordReadSample(k);
1 2 3 4 5 6 void DBImpl::RecordReadSample (Slice key) { MutexLock l (&mutex_) ; if (versions_->current ()->RecordReadSample (key)) { MaybeScheduleCompaction (); } }
MaybeScheduleCompaction是Compaction过程,详情参考Compaction章节,这里不赘述。重点看RecordReadSample_:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 bool Version::RecordReadSample (Slice internal_key) { ParsedInternalKey ikey; if (!ParseInternalKey (internal_key, &ikey)) { return false ; } struct State { GetStats stats; int matches; static bool Match (void * arg, int level, FileMetaData* f) { State* state = reinterpret_cast <State*>(arg); state->matches++; if (state->matches == 1 ) { state->stats.seek_file = f; state->stats.seek_file_level = level; } return state->matches < 2 ; } }; State state; state.matches = 0 ; ForEachOverlapping (ikey.user_key, internal_key, &state, &State::Match); if (state.matches >= 2 ) { return UpdateStats (state.stats); } return false ; }
这里是不是看着比较熟悉?整体框架和基于Seek的Compaction完全相同。只不过Match函数采用了matches次数来记录seek_file指针。
一旦匹配到两次以上,就执行UpdateStats,在UpdateStats内部会对seek_file所指向的file的allowed_seek–。 当allowed_seek减到<=0时,就可以设置用来触发seek compaction的标志了。
DBIter是对InternelIter的封装,两者针对的粒度不同。InternelIter以InternelKey为粒度,一个InternelKey由(user_key,sequence,type)组成,只要这三者有一个不同,InternelIter就认为它们是不同key。但对用户来说,只关心user_key是否相同,所以诞生了DBIter,用来在具有相同user_key的internelkey中,找到最新版本的那个并返回给用户。
到这里,leveldb中的所有iterator都已介绍完毕。