Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest){ mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is // committed only when the descriptor is created, and this directory // may already exist from a previous failed creation attempt. // 创建数据库目录 env_->CreateDir(dbname_); assert(db_lock_ == nullptr); Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; }
if (!env_->FileExists(CurrentFileName(dbname_))) { // 首次运行系统 if (options_.create_if_missing) { s = NewDB(); // 创建一个新DB if (!s.ok()) { return s; } } else { return Status::InvalidArgument( dbname_, "does not exist (create_if_missing is false)"); } } else { if (options_.error_if_exists) { return Status::InvalidArgument(dbname_, "exists (error_if_exists is true)"); } }
// 第一部分 // 执行Recover s = versions_->Recover(save_manifest); if (!s.ok()) { return s; } SequenceNumber max_sequence(0);
// 第二部分 // 从那些未注册的log中还原数据(即系统crash后,从log文件中恢复数据) // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous // incarnation without registering them in the descriptor). // // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of leveldb. constuint64_t min_log = versions_->LogNumber(); constuint64_t prev_log = versions_->PrevLogNumber(); std::vector<std::string> filenames; s = env_->GetChildren(dbname_, &filenames); if (!s.ok()) { return s; } std::set<uint64_t> expected; versions_->AddLiveFiles(&expected); uint64_t number; FileType type; std::vector<uint64_t> logs; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { expected.erase(number); if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); } } if (!expected.empty()) { char buf[50]; std::snprintf(buf, sizeof(buf), "%d missing files; e.g.", static_cast<int>(expected.size())); return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); }
// Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence); if (!s.ok()) { return s; }
// The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); }
if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); }
Status VersionSet::Recover(bool* save_manifest){ structLogReporter : public log::Reader::Reporter { Status* status; voidCorruption(size_t bytes, const Status& s)override{ if (this->status->ok()) *this->status = s; } };
// Read "CURRENT" file, which contains a pointer to the current manifest file std::string current; // current保存的是当前的manifest文件名 Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t); if (!s.ok()) { return s; } if (current.empty() || current[current.size() - 1] != '\n') { return Status::Corruption("CURRENT file does not end with newline"); } current.resize(current.size() - 1);
// 获取manifest文件路径 std::string dscname = dbname_ + "/" + current; SequentialFile* file; s = env_->NewSequentialFile(dscname, &file); if (!s.ok()) { if (s.IsNotFound()) { return Status::Corruption("CURRENT points to a non-existent file", s.ToString()); } return s; }
if (s.ok()) { if (!have_next_file) { s = Status::Corruption("no meta-nextfile entry in descriptor"); } elseif (!have_log_number) { s = Status::Corruption("no meta-lognumber entry in descriptor"); } elseif (!have_last_sequence) { s = Status::Corruption("no last-sequence-number entry in descriptor"); }
if (!have_prev_log_number) { prev_log_number = 0; }
// 是否需要复用manifest // See if we can reuse the existing MANIFEST file. if (ReuseManifest(dscname, current)) { // 看manifest文件的大小是否过大,不算大,则可复用 // No need to save new manifest } else { *save_manifest = true; } }
// 第二部分 // 从那些未注册的log中还原数据(即系统crash后,从log文件中恢复数据) // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous // incarnation without registering them in the descriptor). // // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of leveldb. constuint64_t min_log = versions_->LogNumber(); // 正常系统环境下的最小log号,所有异常环境下(crash)的log号都比这个大 constuint64_t prev_log = versions_->PrevLogNumber(); std::vector<std::string> filenames; s = env_->GetChildren(dbname_, &filenames); // 获取所有数据库文件名 if (!s.ok()) { return s; } std::set<uint64_t> expected; versions_->AddLiveFiles(&expected); // 加入所有版本中的sstable文件number(到这里还只有一个版本) uint64_t number; FileType type; std::vector<uint64_t> logs; // logs保存需要执行恢复的log文件名 for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { expected.erase(number); if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); // 得到需要执行恢复的log文件 } } if (!expected.empty()) { char buf[50]; std::snprintf(buf, sizeof(buf), "%d missing files; e.g.", static_cast<int>(expected.size())); return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); }
// Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence); // 正式从log中恢复数据 if (!s.ok()) { return s; }
// The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); }
if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); }
// Read all the records and add to a memtable std::string scratch; Slice record; WriteBatch batch; int compactions = 0; MemTable* mem = nullptr; while (reader.ReadRecord(&record, &scratch) && status.ok()) { // 读取log文件中的每条记录 if (record.size() < 12) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); continue; } WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { mem = newMemTable(internal_comparator_); mem->Ref(); } // 插入到memtable中 status = WriteBatchInternal::InsertInto(&batch, mem); MaybeIgnoreError(&status); if (!status.ok()) { break; } const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; if (last_seq > *max_sequence) { *max_sequence = last_seq; }
// memtable满, 需要执行compaction if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; status = WriteLevel0Table(mem, edit, nullptr); mem->Unref(); mem = nullptr; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. break; } } }
voidDBImpl::RemoveObsoleteFiles(){ ... for (std::string& filename : filenames) { if (ParseFileName(filename, &number, &type)) { bool keep = true; switch (type) { case kLogFile: // log file删除 keep = ((number >= versions_->LogNumber()) || (number == versions_->PrevLogNumber())); break; case kDescriptorFile: // manifest file删除 // Keep my manifest file, and any newer incarnations' // (in case there is a race that allows other incarnations) keep = (number >= versions_->ManifestFileNumber()); break; case kTableFile: keep = (live.find(number) != live.end()); break; case kTempFile: // Any temp files that are currently being written to must // be recorded in pending_outputs_, which is inserted into "live" keep = (live.find(number) != live.end()); break; case kCurrentFile: case kDBLockFile: case kInfoLogFile: keep = true; break; }
// While deleting all files unblock other threads. All files being deleted // have unique names which will not collide with newly created files and // are therefore safe to delete while allowing other threads to proceed. mutex_.Unlock(); for (const std::string& filename : files_to_delete) { env_->RemoveFile(dbname_ + "/" + filename); // 正式删除文件 } mutex_.Lock(); }
voidDBImpl::MaybeScheduleCompaction(){ mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled } elseif (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } elseif (!bg_error_.ok()) { // Already got an error; no more changes } elseif (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done } else { background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } }