之前分享的文章leveldb实现原理分析详细描述了LevelDB的实现原理,本文从Put接口来看下leveldb数据写流程的源码实现。
LevelDB的读写接口在类DB中定义(leveldb/db.h),接口声明如下:
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
// Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database.
// Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
// If there is no entry for "key" leave *value unchanged and return
// a status for which Status::IsNotFound() returns true.
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) = 0;
而接口的实现则在DBImpl类中(leveldb/db_impl.h)。这篇文章跟随Put接口,去了解leveldb的内部实现。
DBImpl类中的Put方法实现如下:
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
它什么也没有做只是调用了父类DB的Put实现,DB::Put的实现如下:
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
父类的实现也很简单,就是将待写入的key-value放入WriteBatch对象中,然后调用了Write方法。在继续看Write方法实现之前,我们先看下这个方法用到的几个类:WriteOption,WriteBatch和Slice.它们的定义如下:
struct WriteOptions {
// If true, the write will be flushed from the operating system
// buffer cache (by calling WritableFile::Sync()) before the write
// is considered complete. If this flag is true, writes will be
// slower.
//
// If this flag is false, and the machine crashes, some recent
// writes may be lost. Note that if it is just the process that
// crashes (i.e., the machine does not reboot), no writes will be
// lost even if sync==false.
//
// In other words, a DB write with sync==false has similar
// crash semantics as the "write()" system call. A DB write
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
//
// Default: false
bool sync;
WriteOptions()
: sync(false) {
}
}
WriteOptions是个结构体,内部只有一个成员sync,从注释中可以知道,这个sync变量是用于决定是否需要将数据立即同步到磁盘的标记。
class WriteBatch {
public:
WriteBatch();
~WriteBatch();
// Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value);
.......
private:
friend class WriteBatchInternal;
std::string rep_; // See comment in write_batch.cc for the format of rep_
// Intentionally copyable
};
从名字上可以看出它是一个待写key-value的存储器,用于批量写入到DB中。它的成员rep_负责存储这些数据,rep_内部存储结构如下:
8字节key-value个数 | 1字节key-value类型 | 4字节key长度 | key数据 | 4字节value长度 | value数据 | 1字节key-value类型 | ................. |
Slice是leveldb封装的一个buffer,可以理解为string,它内存存储的数据允许'\0'存在。
言归正传,在DB::Put方法中将待写入的key-value放入WriteBatch之后,调用了DBImpl类中的Write方法,Write方法实现如下:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
//先将本次需要写的WriteBatch放入Writer对象
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
//将当前待写的key-value放入待写队列尾部,此处writers_是一个dequeue<Writer*>的对象,由于这个方法会被多线程调用,所以写入操作需要进行排队
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
//如果当前写操作未完成,并且当前写操作还没有排到队列首部,则继续等待
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
//这里需要确保当前内存中的Memtable有足够的空间可写,MakeRoomForWrite负责做这件事,这个方法在后续再细说
Status status = MakeRoomForWrite(my_batch == NULL);
//levelDB中的Version/VersionEdit/VersionSet后面再单独细说,目前只需要知道leveldb通过Version来存储指示当前使用的版本,VersionEdit表示不同版本之间的差异,VersionSet则存储了所有历史版本信息
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
//这里是将多个WriteBatch合并为一个WriteBatch,以达到提升效率的目的,这个方法后面再看
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
//增加序列号,用户后续更新当前版本号
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
//leveldb为了防止内存数据丢失,先将数据写入到log中,当log写成功之后,再更新内存数据
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
//如果WriteOption中指定需要同步写,则这里需要强制将log文件中的数据同步到磁盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
//log写成功之后,将数据写入到内存中的MemTable中,mem_即为内存中的Memtable,本质上是一个SkipList
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();
//更新当前内存中的版本的序列号
versions_->SetLastSequence(last_sequence);
}
while (true) {
//将已完成的写操作从队列中清除
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
//通知其他正在等待的写操作,可以往下执行
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
Put的主流程都在Write方法中,下面看下在Write方法中调用的几个方法。
首先看下MakeRoomForWrite方法,这个方法是用于确保内存中的Memtable有足够的内存空间用于本次的写操作。它会检测当前Memtable可用空间,并与当前写操作需要占用的空间进行比较,在空间不够时,创建新的Memtable,并且将原先的Memtable转换为Immutable Memtable。在必要的情况下出发Compaction操作。以下是方法的代码
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
//如果当前的允许写延迟,并且Level0的文件数达到了配置的阈值,则在此处主动休眠1s,以给Compaction一点时间
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
//当前Memtable中还有足够的内存
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
//imm_不为空,表示当前内存中已经有一份Immutable Memtable等待被Compaction到Level0文件中,此时必须等待上一个immutable Memtable合并到level0的文件之后才能继续
Log(options_.info_log, "Current memtable full; waiting...\n");
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
// Level0的文件数超过限制,此时需要等待后台的Compaction线程完成磁盘中不同层文件的整合
Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else {
//此时说明空间不够,但当前内存中没有未Compaction的Immutable Memtable文件,所以将当前内存中的Memtable文件转换为Immutable,并重新创建Memtable和对应的Log对象
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
//如果有新的Immutable Memtable生成,并且后台Compaction线程没有执行,则启动Compaction线程
MaybeScheduleCompaction();
}
}
return s;
}
LevelDB在写数据时,并非一个WriteBatch写一次,为了提升性能,它会将多个晓得WriteBatch合并为一个大的WriteBatch,这个操作在BuildBatchGroup方法中实现,代码如下:
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != NULL);
size_t size = WriteBatchInternal::ByteSize(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
//合并后单个WriteBatch大小限制为1M
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
//合并后的WriteBatch的sync标记需要保持一致
break;
}
if (w->batch != NULL) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
//合并后的WriteBatch实际上是存放在tmp_batch_中
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
以上就是levelDB的Put的主流程,在后续的文章中,我们再详细分析Memtable Log Compaction Version ManifestFile等内容。
如有兴趣,可关注微信公众号,可以相互交流学习,后续文章更新会同步更新到公众号中。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)