LevelDB源码分析之从Put说起

2023-05-16

之前分享的文章leveldb实现原理分析详细描述了LevelDB的实现原理,本文从Put接口来看下leveldb数据写流程的源码实现。

LevelDB的读写接口在类DB中定义(leveldb/db.h),接口声明如下:

// Set the database entry for "key" to "value".  Returns OK on success,
  // and a non-OK status on error.
  // Note: consider setting options.sync = true.
  virtual Status Put(const WriteOptions& options,
                     const Slice& key,
                     const Slice& value) = 0;

  // Remove the database entry (if any) for "key".  Returns OK on
  // success, and a non-OK status on error.  It is not an error if "key"
  // did not exist in the database.
  // Note: consider setting options.sync = true.
  virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;

  // Apply the specified updates to the database.
  // Returns OK on success, non-OK on failure.
  // Note: consider setting options.sync = true.
  virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;

  // If the database contains an entry for "key" store the
  // corresponding value in *value and return OK.
  //
  // If there is no entry for "key" leave *value unchanged and return
  // a status for which Status::IsNotFound() returns true.
  //
  // May return some other Status on an error.
  virtual Status Get(const ReadOptions& options,
                     const Slice& key, std::string* value) = 0;​

而接口的实现则在DBImpl类中(leveldb/db_impl.h)。这篇文章跟随Put接口,去了解leveldb的内部实现。

DBImpl类中的Put方法实现如下:

Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  return DB::Put(o, key, val);
}
它什么也没有做只是调用了父类DB的Put实现,DB::Put的实现如下:
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

父类的实现也很简单,就是将待写入的key-value放入WriteBatch对象中,然后调用了Write方法。在继续看Write方法实现之前,我们先看下这个方法用到的几个类:WriteOption,WriteBatch和Slice.它们的定义如下:

struct WriteOptions {
  // If true, the write will be flushed from the operating system
  // buffer cache (by calling WritableFile::Sync()) before the write
  // is considered complete.  If this flag is true, writes will be
  // slower.
  //
  // If this flag is false, and the machine crashes, some recent
  // writes may be lost.  Note that if it is just the process that
  // crashes (i.e., the machine does not reboot), no writes will be
  // lost even if sync==false.
  //
  // In other words, a DB write with sync==false has similar
  // crash semantics as the "write()" system call.  A DB write
  // with sync==true has similar crash semantics to a "write()"
  // system call followed by "fsync()".
  //
  // Default: false
  bool sync;

  WriteOptions()
      : sync(false) {
  }
}

WriteOptions是个结构体,内部只有一个成员sync,从注释中可以知道,这个sync变量是用于决定是否需要将数据立即同步到磁盘的标记。

class WriteBatch {
 public:
  WriteBatch();
  ~WriteBatch();

  // Store the mapping "key->value" in the database.
  void Put(const Slice& key, const Slice& value);
  .......
 private:
  friend class WriteBatchInternal;

  std::string rep_;  // See comment in write_batch.cc for the format of rep_

  // Intentionally copyable
};

从名字上可以看出它是一个待写key-value的存储器,用于批量写入到DB中。它的成员rep_负责存储这些数据,rep_内部存储结构如下:

8字节key-value个数1字节key-value类型4字节key长度key数据4字节value长度value数据1字节key-value类型.................

Slice是leveldb封装的一个buffer,可以理解为string,它内存存储的数据允许'\0'存在。

言归正传,在DB::Put方法中将待写入的key-value放入WriteBatch之后,调用了DBImpl类中的Write方法,Write方法实现如下:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  //先将本次需要写的WriteBatch放入Writer对象
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  //将当前待写的key-value放入待写队列尾部,此处writers_是一个dequeue<Writer*>的对象,由于这个方法会被多线程调用,所以写入操作需要进行排队
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
     //如果当前写操作未完成,并且当前写操作还没有排到队列首部,则继续等待
     w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  //这里需要确保当前内存中的Memtable有足够的空间可写,MakeRoomForWrite负责做这件事,这个方法在后续再细说
  Status status = MakeRoomForWrite(my_batch == NULL);
  //levelDB中的Version/VersionEdit/VersionSet后面再单独细说,目前只需要知道leveldb通过Version来存储指示当前使用的版本,VersionEdit表示不同版本之间的差异,VersionSet则存储了所有历史版本信息
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    //这里是将多个WriteBatch合并为一个WriteBatch,以达到提升效率的目的,这个方法后面再看 
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    //增加序列号,用户后续更新当前版本号
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      //leveldb为了防止内存数据丢失,先将数据写入到log中,当log写成功之后,再更新内存数据
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        //如果WriteOption中指定需要同步写,则这里需要强制将log文件中的数据同步到磁盘
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        //log写成功之后,将数据写入到内存中的MemTable中,mem_即为内存中的Memtable,本质上是一个SkipList
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();
    //更新当前内存中的版本的序列号
    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    //将已完成的写操作从队列中清除
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      //通知其他正在等待的写操作,可以往下执行  
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

Put的主流程都在Write方法中,下面看下在Write方法中调用的几个方法。

首先看下MakeRoomForWrite方法,这个方法是用于确保内存中的Memtable有足够的内存空间用于本次的写操作。它会检测当前Memtable可用空间,并与当前写操作需要占用的空间进行比较,在空间不够时,创建新的Memtable,并且将原先的Memtable转换为Immutable Memtable。在必要的情况下出发Compaction操作。以下是方法的代码

Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (
        allow_delay &&
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      //如果当前的允许写延迟,并且Level0的文件数达到了配置的阈值,则在此处主动休眠1s,以给Compaction一点时间
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      //当前Memtable中还有足够的内存
      break;
    } else if (imm_ != NULL) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      //imm_不为空,表示当前内存中已经有一份Immutable Memtable等待被Compaction到Level0文件中,此时必须等待上一个immutable Memtable合并到level0的文件之后才能继续
      Log(options_.info_log, "Current memtable full; waiting...\n");
      bg_cv_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      // Level0的文件数超过限制,此时需要等待后台的Compaction线程完成磁盘中不同层文件的整合
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      bg_cv_.Wait();
    } else {
      //此时说明空间不够,但当前内存中没有未Compaction的Immutable Memtable文件,所以将当前内存中的Memtable文件转换为Immutable,并重新创建Memtable和对应的Log对象
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = NULL;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      //如果有新的Immutable Memtable生成,并且后台Compaction线程没有执行,则启动Compaction线程
      MaybeScheduleCompaction();
    }
  }
  return s;
}
LevelDB在写数据时,并非一个WriteBatch写一次,为了提升性能,它会将多个晓得WriteBatch合并为一个大的WriteBatch,这个操作在BuildBatchGroup方法中实现,代码如下:
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  assert(!writers_.empty());
  Writer* first = writers_.front();
  WriteBatch* result = first->batch;
  assert(result != NULL);

  size_t size = WriteBatchInternal::ByteSize(first->batch);

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  //合并后单个WriteBatch大小限制为1M
  size_t max_size = 1 << 20;
  if (size <= (128<<10)) {
    max_size = size + (128<<10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      //合并后的WriteBatch的sync标记需要保持一致
      break;
    }

    if (w->batch != NULL) {
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }

      // Append to *result
      if (result == first->batch) {
        // Switch to temporary batch instead of disturbing caller's batch
        //合并后的WriteBatch实际上是存放在tmp_batch_中
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
}

以上就是levelDB的Put的主流程,在后续的文章中,我们再详细分析Memtable Log  Compaction  Version ManifestFile等内容。

如有兴趣,可关注微信公众号,可以相互交流学习,后续文章更新会同步更新到公众号中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

LevelDB源码分析之从Put说起 的相关文章

  • 请求类型get,delete,post,put 的用法(传参)

    1 get delete传参 注意 xff1a xff08 1 xff09 传参格式 xff1a 96 URL key 61 value amp key2 61 value2 96 xff08 2 xff09 注意使用反引号 xff0c 而
  • linux/mm/memory.c/put_page

    put page用来完成物理页面与一个线性地址页面的挂接 xff0c 从而将一个 线性地址空间内的页面落实到物理地址空间内 xff0c copy page tables函数 只是为一个进程提供了在线性地址空间的一个页表及1024页内存 xf
  • 【数据库】Redis和RocksDB、levelDB的区别

    区别 Redis 是一个服务 xff0c 独立的进程 xff0c 用户的程序需要与它建立连接才能向它发请求 xff0c 读写数据 RocksDB 和LevelDB 是一个库 xff0c 嵌入在用户的程序中 xff0c 用户程序直接调用接口读
  • leveldb源码分析--SSTable之Compaction 详解

    http www cnblogs com KevinT p 3819134 html leveldb源码分析 SSTable之Compaction 对于compaction是leveldb中体量最大的一部分 也应该是最为复杂的部分 为了便于
  • 大数据时代的Tcaplus游戏存储

    大数据时代的Tcaplus游戏存储 shiweizhang 2015 10 27 1 7k浏览 游戏开发数据分析场景 想免费获取内部独家PPT资料库 观看行业大牛直播 点击加入腾讯游戏学院游戏开发行业精英群711501594 摘要 大数据具
  • Leveldb源码分析--13

    8 FilterPolicy Bloom之2 8 5 构建FilterBlock 8 5 1 FilterBlockBuilder 了解了filter机制 现在来看看filter block的构建 这就是类FilterBlockBuilde
  • LevelDB源码阅读-key

    levelDB中的key 前言 在levelDB中有五种不同的key 在正式分析memtable之前我们先介绍一下这5中不同的key user key ParsedInternalKey InternalKey LookupKey Memt
  • LevelDB.NET 使用

    LevelDB是google实现的非常高效的kv数据库 多数是和redis比较 这里记录下如何使用 新建项目 Nuget添加类库 通过反编译发现运行时是 NET 4 0 这里我用4 5测试需要选择64位平台 代码 写数据 db Put Wr
  • Windows下 VS2015编译RocksDB

    Windows下 VS2015编译RocksDB VS2015编译RocksDB RocksDB 是一个来自 facebook 的可嵌入式的支持持久化的 key value 存储系统 也可作为 C S 模式下的存储数据库 但主要目的还是嵌入
  • leveldb注释7–key与value

    作为一个kv的系统 key的存储至关重要 在leveldb中 主要涉及到如下几个key user key InternalKey与LookupKey memtable key 其关系构成如下图 user key就是用户输入的key 而Int
  • 来自 HTTP 客户端的 PUT 请求后收到“411 Length required”

    我正在开发一个实现 HTTP 客户端的 Java 程序 我测试它向服务器发送请求 GET POST 和 DELETE 请求工作正常 例如 在 POST 请求之后我得到一个输出 Data extracted status message ok
  • 使用 C 的原始 libcurl JSON PUT 请求

    我目前正在编写一个类似 REST 的客户端 只需要执行 PUT 请求 Problem 运行该程序并没有在 URL 的 API 上给出正确的结果 我不知道为什么 使用curl easy perform curl 在调用时不会抛出错误 但 UR
  • Java中的PUT请求[关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我正在尝试用 Java 发送 PUT 请求 其中 header 和 body 为 json 格式 正文需要具有特定名称
  • Slim PUT 返回 NULL

    我对 Slim 框架和 PUT 请求有疑问 我有一个小的 jQuery 脚本 它将在单击按钮时更新到期时间 expiry button click function event event preventDefault ajax url h
  • 在 IIS 6.0 中运行时,PUT 和 DELETE 在 WCF REST 服务 .net 4 中获取 404

    我已添加 c windows microsoft net framework v4 0 30319 aspnet isapi dll 的通配符应用程序映射 并取消选中 验证该文件存在 复选框 这给了我 GET 和 POST 能力 但没有 P
  • Laravel 表单 html,带有 PUT 路由的 PUT 方法

    我的路线中有这个 Domain URI Name Action
  • 具有包罗万象的路由的 ASP.NET Web API PUT

    使用新的 Web api 是否可以使用像这样的 catch all 路由 routes MapHttpRoute name name routeTemplate api id defaults new controller mycontro
  • 如何在 Nginx 服务器上允许 PUT 文件请求?

    我正在使用一个需要的应用程序PUTHTTP 服务器上的文件 我使用 Nginx 作为服务器 但得到了一个405 Not Allowed错误返回 以下是使用 cURL 进行测试的示例 curl X PUT H Content Type app
  • HTTP 动词 - 何时使用 GET/POST/PUT/Delete

    当您从事 RESTFUL 服务时 您经常会听到这些术语GET POST PUT DELETE 我的问题是这么多动词背后的想法是什么 我可以在以下人的帮助下实现一切GET动词或者如果我想在消息正文中发布一些大数据 我可以使用POST动词 我认
  • 用于 LIFX 电源开/关的 PHP HTTP CURL PUT 请求

    我正在尝试使用 PHP 打开 关闭所有 Lifx 灯泡 API 文档 http developer lifx com http developer lifx com 表示使用 PUT 请求 curl u c87c73a896b554367f

随机推荐