Rocksdb 代码学习 写流程1(WriteBatch写,WriterThead调度Writer)

2023-11-17

1.几个需要使用的相关类

1.Slice

//主要用来装数据的
// 就两个成员变量data,size
// (就是用装key和value的值,长度),以及一些处理函数。
class Slice {
 public:
  // Create an empty slice.
  Slice() : data_(""), size_(0) { }
  // Create a slice that refers to d[0,n-1].
  Slice(const char* d, size_t n) : data_(d), size_(n) { }
  // Create a slice that refers to the contents of "s"
  /* implicit */
  Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }
  // Create a slice that refers to s[0,strlen(s)-1]
  /* implicit */
  Slice(const char* s) : data_(s), size_(strlen(s)) { }
  // Create a single slice from SliceParts using buf as storage.
  // buf must exist as long as the returned Slice exists.
  Slice(const struct SliceParts& parts, std::string* buf);
  // Return a pointer to the beginning of the referenced data
  const char* data() const { return data_; }
  // Return the length (in bytes) of the referenced data
  size_t size() const { return size_; }
  // Return true iff the length of the referenced data is zero
  bool empty() const { return size_ == 0; }
  // Return the ith byte in the referenced data.
  // REQUIRES: n < size()
  char operator[](size_t n) const {
    assert(n < size());
    return data_[n];
  }
  // Change this slice to refer to an empty array
  void clear() { data_ = ""; size_ = 0; }
  // Drop the first "n" bytes from this slice.
  void remove_prefix(size_t n) {
    assert(n <= size());
    data_ += n;
    size_ -= n;
  }
  // Return a string that contains the copy of the referenced data.
  std::string ToString(bool hex = false) const;
  // Three-way comparison.  Returns value:
  //   <  0 iff "*this" <  "b",
  //   == 0 iff "*this" == "b",
  //   >  0 iff "*this" >  "b"
  int compare(const Slice& b) const;
  // Return true iff "x" is a prefix of "*this"
  bool starts_with(const Slice& x) const {
    return ((size_ >= x.size_) &&
            (memcmp(data_, x.data_, x.size_) == 0));
  }
  // Compare two slices and returns the first byte where they differ
  size_t difference_offset(const Slice& b) const;
 // private: make these public for rocksdbjni access
  const char* data_;
  size_t size_;
  // Intentionally copyable
};

2.WriteOptions

struct WriteOptions {
  // Default: false
  bool sync; //是否需要同步

  // If true, writes will not first go to the write ahead log,
  // and the write may got lost after a crash.
  bool disableWAL; //是否需要写事务日志

  // The option is deprecated. It's not used anymore.
  uint64_t timeout_hint_us; // 指示了这个写操作完成的时间期限

  // If true and if user is trying to write to column families that don't exist
  // (they were dropped),  ignore the write (don't return an error). If there
  // are multiple writes in a WriteBatch, other writes will succeed.
  // Default: false
  bool ignore_missing_column_families;

  WriteOptions()
      : sync(false),
        disableWAL(false),
        timeout_hint_us(0),
        ignore_missing_column_families(false) {}
};

3.WriteBatch


//rocksdb在写时做了一个优化批量更新的操作,即writebatch类。writebatch类只有一个成员变量,存储的
//是若干条记录的序列号字符串,这个字符串是按照一定格式生成,当要取出这些记录时,也要按照格式一条一条
//解析出来。
//先介绍下这个类的成员变量rep_,这个字符串用来存储这次批操作的所有记录,格式如下:
// WriteBatch::rep_ :=
//    sequence: fixed64
//    count: fixed32
//    data: record[count]
// record :=
//    kTypeValue varstring varstring
//    kTypeDeletion varstring
//    kTypeSingleDeletion varstring
//    kTypeMerge varstring varstring
//    kTypeColumnFamilyValue varint32 varstring varstring
//    kTypeColumnFamilyDeletion varint32 varstring varstring
//    kTypeColumnFamilySingleDeletion varint32 varstring varstring
//    kTypeColumnFamilyMerge varint32 varstring varstring
// varstring :=
//    len: varint32
//    data: uint8[len]
//可以看到这个这个字符串首先有有8字节的序列号和4字节的记录数作为头,所以这个类定义了
//static const size_t KHeader=12
//作为这个这个字符串的最小长度。在头之后,紧接着就是一条一条的记录。
//对于插入的记录,由kTypeValue+key长度+key+value长度+value组成
//对于删除记录,由kTypeDelete+key长度+key组成
//这类定义了写和删除的操作实现就是调用WriteBatchInternal这个类里面的方法
class WriteBatch : public WriteBatchBase {
 public:
  explicit WriteBatch(size_t reserved_bytes = 0);
  ~WriteBatch();

  using WriteBatchBase::Put;
  // Store the mapping "key->value" in the database.
  void Put(ColumnFamilyHandle* column_family, const Slice& key,
           const Slice& value) override;
  void Put(const Slice& key, const Slice& value) override {
    Put(nullptr, key, value);
  }

  // Variant of Put() that gathers output like writev(2).  The key and value
  // that will be written to the database are concatentations of arrays of
  // slices.
  void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
           const SliceParts& value) override;
  void Put(const SliceParts& key, const SliceParts& value) override {
    Put(nullptr, key, value);
  }

  using WriteBatchBase::Delete;
  // If the database contains a mapping for "key", erase it.  Else do nothing.
  void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
  void Delete(const Slice& key) override { Delete(nullptr, key); }

  // variant that takes SliceParts
  void Delete(ColumnFamilyHandle* column_family,
              const SliceParts& key) override;
  void Delete(const SliceParts& key) override { Delete(nullptr, key); }

  using WriteBatchBase::SingleDelete;
  // If the database contains a mapping for "key", erase it. Expects that the
  // key was not overwritten. Else do nothing.
  void SingleDelete(ColumnFamilyHandle* column_family,
                    const Slice& key) override;
  void SingleDelete(const Slice& key) override { SingleDelete(nullptr, key); }

  // variant that takes SliceParts
  void SingleDelete(ColumnFamilyHandle* column_family,
                    const SliceParts& key) override;
  void SingleDelete(const SliceParts& key) override {
    SingleDelete(nullptr, key);
  }

4.WriteBatchInternal
这个类主要作用就是操作WriteBatch的字符串,比如取出/设置序列号,取出/设置记录数,将WriteBatch插入memtable等等。来看下这个类的操作成员方法,全部声明为static方法:

class WriteBatchInternal {
 public:
  // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
  static void Put(WriteBatch* batch, uint32_t column_family_id,
                  const Slice& key, const Slice& value);

  static void Put(WriteBatch* batch, uint32_t column_family_id,
                  const SliceParts& key, const SliceParts& value);

  static void Delete(WriteBatch* batch, uint32_t column_family_id,
                     const SliceParts& key);

  static void Delete(WriteBatch* batch, uint32_t column_family_id,
                     const Slice& key);

  static void SingleDelete(WriteBatch* batch, uint32_t column_family_id,
                           const SliceParts& key);

  static void SingleDelete(WriteBatch* batch, uint32_t column_family_id,
                           const Slice& key);

  static void Merge(WriteBatch* batch, uint32_t column_family_id,
                    const Slice& key, const Slice& value);

  static void Merge(WriteBatch* batch, uint32_t column_family_id,
                    const SliceParts& key, const SliceParts& value);

  // Return the number of entries in the batch.
  static int Count(const WriteBatch* batch);

  // Set the count for the number of entries in the batch.
  static void SetCount(WriteBatch* batch, int n);

  // Return the seqeunce number for the start of this batch.
  static SequenceNumber Sequence(const WriteBatch* batch);

  // Store the specified number as the seqeunce number for the start of
  // this batch.
  static void SetSequence(WriteBatch* batch, SequenceNumber seq);

  // Returns the offset of the first entry in the batch.
  // This offset is only valid if the batch is not empty.
  static size_t GetFirstOffset(WriteBatch* batch);

  static Slice Contents(const WriteBatch* batch) {
    return Slice(batch->rep_);
  }

  static size_t ByteSize(const WriteBatch* batch) {
    return batch->rep_.size();
  }

  static void SetContents(WriteBatch* batch, const Slice& contents);

  // Inserts batch entries into memtable
  // If dont_filter_deletes is false AND options.filter_deletes is true,
  // then --> Drops deletes in batch if db->KeyMayExist returns false
  // If ignore_missing_column_families == true. WriteBatch referencing
  // non-existing column family should be ignored.
  // However, if ignore_missing_column_families == false, any WriteBatch
  // referencing non-existing column family will return a InvalidArgument()
  // failure.
  //
  // If log_number is non-zero, the memtable will be updated only if
  // memtables->GetLogNumber() >= log_number
  static Status InsertInto(const WriteBatch* batch,
                           ColumnFamilyMemTables* memtables,
                           bool ignore_missing_column_families = false,
                           uint64_t log_number = 0, DB* db = nullptr,
                           const bool dont_filter_deletes = true);

  static void Append(WriteBatch* dst, const WriteBatch* src);
};

5.MemTableInserter

// 这个类将正常的键值对和删除类型的键值对添加进memtable。这个类将作为参数,传入rep_解析函数
class MemTableInserter : public WriteBatch::Handler {
 public:
  SequenceNumber sequence_;
  ColumnFamilyMemTables* cf_mems_;
  bool ignore_missing_column_families_;
  uint64_t log_number_;
  DBImpl* db_;
  const bool dont_filter_deletes_;

  MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
                   bool ignore_missing_column_families, uint64_t log_number,
                   DB* db, const bool dont_filter_deletes)
      : sequence_(sequence),
        cf_mems_(cf_mems),
        ignore_missing_column_families_(ignore_missing_column_families),
        log_number_(log_number),
        db_(reinterpret_cast<DBImpl*>(db)),
        dont_filter_deletes_(dont_filter_deletes) {
    assert(cf_mems);
    if (!dont_filter_deletes_) {
      assert(db_);
    }
  }

  bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
    // We are only allowed to call this from a single-threaded write thread
    // (or while holding DB mutex)
    bool found = cf_mems_->Seek(column_family_id);
    if (!found) {
      if (ignore_missing_column_families_) {
        *s = Status::OK();
      } else {
        *s = Status::InvalidArgument(
            "Invalid column family specified in write batch");
      }
      return false;
    }
    if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) {
      // This is true only in recovery environment (log_number_ is always 0 in
      // non-recovery, regular write code-path)
      // * If log_number_ < cf_mems_->GetLogNumber(), this means that column
      // family already contains updates from this log. We can't apply updates
      // twice because of update-in-place or merge workloads -- ignore the
      // update
      *s = Status::OK();
      return false;
    }
    return true;
  }
  virtual Status PutCF(uint32_t column_family_id, const Slice& key,
                       const Slice& value) override {
    Status seek_status;
      //如何在memtable中没有找到传入的ColumnFamily,直接返回
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
      ++sequence_;
      return seek_status;
    }
      //获取memtable
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
      //如何memtable操作中的内部更新不支持就添加这条记录
    if (!moptions->inplace_update_support) {
      mem->Add(sequence_, kTypeValue, key, value);
        //或者更新这条记录
    } else if (moptions->inplace_callback == nullptr) {
      mem->Update(sequence_, key, value);
      RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
    } else {
        //或者更新这条记录
      if (mem->UpdateCallback(sequence_, key, value)) {
      } else {
          //如果在memtable中找不到这条记录,就去从sst获取,并且更新,添加
        // key not found in memtable. Do sst get, update, add
        SnapshotImpl read_from_snapshot;
        read_from_snapshot.number_ = sequence_;
        ReadOptions ropts;
        ropts.snapshot = &read_from_snapshot;

        std::string prev_value;
        std::string merged_value;

        auto cf_handle = cf_mems_->GetColumnFamilyHandle();
        if (cf_handle == nullptr) {
          cf_handle = db_->DefaultColumnFamily();
        }
          //调用数据库的Get的操作获获取这个key之前的值,并存在快照中
        Status s = db_->Get(ropts, cf_handle, key, &prev_value);

        char* prev_buffer = const_cast<char*>(prev_value.c_str());
        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
        auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
                                                 s.ok() ? &prev_size : nullptr,
                                                 value, &merged_value);
        if (status == UpdateStatus::UPDATED_INPLACE) {
          // prev_value is updated in-place with final value.
          mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
          RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
        } else if (status == UpdateStatus::UPDATED) {
          // merged_value contains the final value.
          mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
          RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
        }
      }
    }
    // Since all Puts are logged in trasaction logs (if enabled), always bump
    // sequence number. Even if the update eventually fails and does not result
    // in memtable add/update.
    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
  }

  virtual Status DeleteCF(uint32_t column_family_id,
                          const Slice& key) override {
    Status seek_status;
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
      ++sequence_;
      return seek_status;
    }
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    if (!dont_filter_deletes_ && moptions->filter_deletes) {
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions ropts;
      ropts.snapshot = &read_from_snapshot;
      std::string value;
      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
      if (cf_handle == nullptr) {
        cf_handle = db_->DefaultColumnFamily();
      }
      if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
        RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
        return Status::OK();
      }
    }
    mem->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
  }

  virtual Status SingleDeleteCF(uint32_t column_family_id,
                                const Slice& key) override {
    Status seek_status;
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
      ++sequence_;
      return seek_status;
    }
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    if (!dont_filter_deletes_ && moptions->filter_deletes) {
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions ropts;
      ropts.snapshot = &read_from_snapshot;
      std::string value;
      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
      if (cf_handle == nullptr) {
        cf_handle = db_->DefaultColumnFamily();
      }
      if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
        RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
        return Status::OK();
      }
    }
    mem->Add(sequence_, kTypeSingleDeletion, key, Slice());
    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
  }

  virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
                         const Slice& value) override {
    Status seek_status;
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
      ++sequence_;
      return seek_status;
    }
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    bool perform_merge = false;

    if (moptions->max_successive_merges > 0 && db_ != nullptr) {
      LookupKey lkey(key, sequence_);

      // Count the number of successive merges at the head
      // of the key in the memtable
      size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);

      if (num_merges >= moptions->max_successive_merges) {
        perform_merge = true;
      }
    }

    if (perform_merge) {
      // 1) Get the existing value
      std::string get_value;

      // Pass in the sequence number so that we also include previous merge
      // operations in the same batch.
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions read_options;
      read_options.snapshot = &read_from_snapshot;

      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
      if (cf_handle == nullptr) {
        cf_handle = db_->DefaultColumnFamily();
      }
      db_->Get(read_options, cf_handle, key, &get_value);
      Slice get_value_slice = Slice(get_value);

      // 2) Apply this merge
      auto merge_operator = moptions->merge_operator;
      assert(merge_operator);

      std::deque<std::string> operands;
      operands.push_front(value.ToString());
      std::string new_value;
      bool merge_success = false;
      {
        StopWatchNano timer(Env::Default(), moptions->statistics != nullptr);
        PERF_TIMER_GUARD(merge_operator_time_nanos);
        merge_success = merge_operator->FullMerge(
            key, &get_value_slice, operands, &new_value, moptions->info_log);
        RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME,
                   timer.ElapsedNanos());
      }

      if (!merge_success) {
          // Failed to merge!
        RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);

        // Store the delta in memtable
        perform_merge = false;
      } else {
        // 3) Add value to memtable
        mem->Add(sequence_, kTypeValue, key, new_value);
      }
    }

    if (!perform_merge) {
      // Add merge operator to memtable
      mem->Add(sequence_, kTypeMerge, key, value);
    }

    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
  }
};




2.写流程

rocksdb_put(db, writeoptions, key, strlen(key), value, strlen(value) + 1, &err);
//调用
SaveError(errptr, db->rep->Put(options->rep, Slice(key, keylen), Slice(val, vallen)));
//调用
db->rep->Put(options->rep, Slice(key, keylen), Slice(val, vallen))
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
               const Slice& key, const Slice& value) {
  // Pre-allocate size of write batch conservatively.
  // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
  // and we allocate 11 extra bytes for key length, as well as value length.
  WriteBatch batch(key.size() + value.size() + 24);  //设置Batch
  batch.Put(column_family, key, value);
  return Write(opt, &batch);//写Batch
}
//首先设置WriteBatch
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
                     const Slice& value) {
  WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
//实际调用的的是WriteBatchInternal::Put
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
                             const Slice& key, const Slice& value) {
    //WriteBatch记入数加1
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
    //
  if (column_family_id == 0) {
    b->rep_.push_back(static_cast<char>(kTypeValue));//添加类型
  } else {
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));//添加类型
    PutVarint32(&b->rep_, column_family_id);
  }
  PutLengthPrefixedSlice(&b->rep_, key);//key的长度和值
  PutLengthPrefixedSlice(&b->rep_, value);//添加value的长度和值
}
//然后再把WriteBatch写入
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
|
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
  return WriteImpl(write_options, my_batch, nullptr);
}
//写入实现在这里
Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, WriteCallback* callback) {
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }
  if (write_options.timeout_hint_us != 0) {
    return Status::InvalidArgument("timeout_hint_us is deprecated");
  }

  Status status;
  bool callback_failed = false;

  bool xfunc_attempted_write = false;
    /*先尝试下可以不可以写*/
  XFUNC_TEST("transaction", "transaction_xftest_write_impl",
             xf_transaction_write1, xf_transaction_write, write_options,
             db_options_, my_batch, callback, this, &status,
             &xfunc_attempted_write);
  if (xfunc_attempted_write) {
    // Test already did the write
    return status;
  }

  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  WriteThread::Writer w; //建立写操作
  w.batch = my_batch; //要写的数据
  w.sync = write_options.sync; //需不需要对事务日志执行fsync或者fdatasync 操作
  w.disableWAL = write_options.disableWAL;//指示需要不需要写事务日志
  w.in_batch_group = false;
  // 最后,in_batch_group的比较有意思。在RocksDB内部,对写入操作做了优化,尽可能地将用户的写入
  // 批量处理。这其中使用了一个队列,即write_thread_内部的WriteThread::Writer*队列。在准备写队列头
  // 的任务时,会试着用BuildBatchGroup()构建一个批量任务组,将紧跟着队头的其他写操作任务加入
  // 到一个BatchGroup,一次性地写入数据库。
  w.done = false; //写操作完成时设置
  w.has_callback = (callback != nullptr) ? true : false;

  if (!write_options.disableWAL) {
    //记入 the Number of Write calls that request WAL
    RecordTick(stats_, WRITE_WITH_WAL);
  }

  //记入工作。数据库评测时用到
  StopWatch write_sw(env_, db_options_.statistics.get(), DB_WRITE);

  // 将当前写入任务@w挂入写队列,并在mutex_上睡眠等待。等待直到:
  // 1) 写操作设置了超时时间,等待超时。或,
  // 2) @w之前的任务都已完成,@w已处于队列头部。或,
  // 3) @w这个写任务被别的写线程完成了。
  // 第3个条件,任务被别的写线程完成,实际上是被之前的写任务合并进一个
  // WriteBatchGroup中去了。此时的@w会被标记成in_batch_group。有意思的是,在JoinBatchGroup()
  // 里面,如果因为超时唤醒了,发现当前任务in_batch_group为true,则会继续等待,
  // 因为它已经被别的线程加入BatchGroup准备写入数据库了。

  write_thread_.JoinBatchGroup(&w);//将要带有要写的batch的Write加入写的队列当中
  if (w.done) {
    // write was done by someone else, no need to grab mutex
    RecordTick(stats_, WRITE_DONE_BY_OTHER);
    return w.status;
  }
  // else we are the leader of the write batch group

  WriteContext context;
  mutex_.Lock();


    //如果需要写事务日志
  if (!write_options.disableWAL) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
  }

    //还是自己的write写自己的batch
  RecordTick(stats_, WRITE_DONE_BY_SELF);
  default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);

  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
  assert(!single_column_family_mode_ ||
         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);


  //设置最大的log的size
  uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0)
                                    ? 4 * max_total_in_memory_state_
                                    : db_options_.max_total_wal_size;

  if (UNLIKELY(!single_column_family_mode_) &&
      alive_log_files_.begin()->getting_flushed == false &&
      total_log_size_ > max_total_wal_size) {
    // 如果column family有多个,最早的活跃的事务日志对应的memtable还没有被写入磁盘,
    // 而且当前日志总大小超过了设定的最大值,那么就需要分配新的memtable,将老的
    // immutable memtable内容写入磁盘。
    uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; //当前活跃事务的日志对应的num
    alive_log_files_.begin()->getting_flushed = true;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "Flushing all column families with data in WAL number %" PRIu64
        ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
        flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
    // no need to refcount because drop is happening in write thread, so can't
    // happen while we're in the write thread
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->IsDropped()) {
        continue;
      }
      //小等于当前活跃事务日志的num的colum family都应该切换新的memtable
      if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
        //为Column family分配新的memtable
        status = SwitchMemtable(cfd, &context);
        if (!status.ok()) {
          break;
        }
        cfd->imm()->FlushRequested();
          //调度将要发生的flush
        SchedulePendingFlush(cfd);
      }
    }

      //调度flush或者compaction
    MaybeScheduleFlushOrCompaction();
  }
      /*判断需要flush   */
  else if (UNLIKELY(write_buffer_.ShouldFlush())) {
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "Flushing all column families. Write buffer is using %" PRIu64
        " bytes out of a total of %" PRIu64 ".",
        write_buffer_.memory_usage(), write_buffer_.buffer_size());
    // no need to refcount because drop is happening in write thread, so can't
    // happen while we're in the write thread
    //这里flush当前版本的columnfamily
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->IsDropped()) {
        continue;
      }
      if (!cfd->mem()->IsEmpty()) {
        status = SwitchMemtable(cfd, &context);
        if (!status.ok()) {
          break;
        }
        cfd->imm()->FlushRequested();
        SchedulePendingFlush(cfd);
      }
    }
      //调度flush或者compaction
    MaybeScheduleFlushOrCompaction();
  }

  if (UNLIKELY(status.ok() && !bg_error_.ok())) {
    status = bg_error_;
  }
    /*flush_schedule不为空*/
  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(&context);
  }

    /*write_controller_判断是否需要stop或者delay*/
  if (UNLIKELY(status.ok()) &&
      (write_controller_.IsStopped() || write_controller_.NeedsDelay())) {
    PERF_TIMER_STOP(write_pre_and_post_process_time);
    PERF_TIMER_GUARD(write_delay_time);
    // We don't know size of curent batch so that we always use the size
    // for previous one. It might create a fairness issue that expiration
    // might happen for smaller writes but larger writes can go through.
    // Can optimize it if it is an issue.
    status = DelayWrite(last_batch_group_size_);
    PERF_TIMER_START(write_pre_and_post_process_time);
  }

  uint64_t last_sequence = versions_->LastSequence();
  WriteThread::Writer* last_writer = &w;
  autovector<WriteBatch*> write_batch_group;
    /*日志的和日志的dir同步*/
  bool need_log_sync = !write_options.disableWAL && write_options.sync;
  bool need_log_dir_sync = need_log_sync && !log_dir_synced_;

  //这里在等待事务日志同步的完成
  if (status.ok()) {
    //把需要写的WriteBatch作为leader加入BatchGroup中
    last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(
        &w, &last_writer, &write_batch_group);

    if (need_log_sync) {
      while (logs_.front().getting_synced) {
        log_sync_cv_.Wait();
      }
      for (auto& log : logs_) {
        assert(!log.getting_synced);
        log.getting_synced = true;
      }
    }

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into memtables

    mutex_.Unlock();

    if (callback != nullptr) {
      // If this write has a validation callback, check to see if this write
      // is able to be written.  Must be called on the write thread.
      status = callback->Callback(this);
      callback_failed = true;
    }
  } else {
    mutex_.Unlock();
  }

  // At this point the mutex is unlocked

  //这里开始是写memtable
  if (status.ok()) {
    //把write_batch_group中的WriteBatch往WriteBatchInternal这个类要往memtable中写的updates-WriteBatch(这样写和前面区分开)添加
      WriteBatch* updates = nullptr;
      if (write_batch_group.size() == 1) {
        updates = write_batch_group[0];
      } else {
        updates = &tmp_batch_;
        for (size_t i = 0; i < write_batch_group.size(); ++i) {

            //往writeBatch追加
          WriteBatchInternal::Append(updates, write_batch_group[i]);
        }
      }
    //这个updates-writeBatch的序列号等于verson中最后的序列号+1
      const SequenceNumber current_sequence = last_sequence + 1;
      //设置序列号
      WriteBatchInternal::SetSequence(updates, current_sequence);
      //获取记录数
      int my_batch_count = WriteBatchInternal::Count(updates);
      //verson中最后的序列号等于updates-writeBatch的count
      last_sequence += my_batch_count;
      const uint64_t batch_size = WriteBatchInternal::ByteSize(updates);
      // Record statistics
      RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
      RecordTick(stats_, BYTES_WRITTEN, batch_size);
      if (write_options.disableWAL) {
        flush_on_destroy_ = true;
      }
      PERF_TIMER_STOP(write_pre_and_post_process_time);

      uint64_t log_size = 0;
      //需要写事务日志
      if (!write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
          //通过WriteBatch创建log入口,就是获取updates-Batch的内容
        Slice log_entry = WriteBatchInternal::Contents(updates);
          //log队列里添加log_entry
        status = logs_.back().writer->AddRecord(log_entry);
        total_log_size_ += log_entry.size();
        //添加日志的size
        alive_log_files_.back().AddSize(log_entry.size());
        log_empty_ = false;
        log_size = log_entry.size();
        RecordTick(stats_, WAL_FILE_BYTES, log_size);

          /*同步日志*/
        if (status.ok() && need_log_sync) {
          RecordTick(stats_, WAL_FILE_SYNCED);
          StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
          // It's safe to access logs_ with unlocked mutex_ here because:
          //  - we've set getting_synced=true for all logs,
          //    so other threads won't pop from logs_ while we're here,
          //  - only writer thread can push to logs_, and we're in
          //    writer thread, so no one will push to logs_,
          //  - as long as other threads don't modify it, it's safe to read
          //    from std::deque from multiple threads concurrently.
          for (auto& log : logs_) {
            status = log.writer->file()->Sync(db_options_.use_fsync);
            if (!status.ok()) {
              break;
            }
          }
          if (status.ok() && need_log_dir_sync) {
            // We only sync WAL directory the first time WAL syncing is
            // requested, so that in case users never turn on WAL sync,
            // we can avoid the disk I/O in the write code path.
            status = directories_.GetWalDir()->Fsync();
          }
        }
      }

      //上面都是在操作log
      if (status.ok()) {
        PERF_TIMER_GUARD(write_memtable_time);

          /****************这里开始将WriteBatch写memtable***********/
       //这个函数就是往memtable中写WriteBatch用的
        status = WriteBatchInternal::InsertInto(
            updates, column_family_memtables_.get(),
            write_options.ignore_missing_column_families, 0, this, false);
        // A non-OK status here indicates iteration failure (either in-memory
        // writebatch corruption (very bad), or the client specified invalid
        // column family).  This will later on trigger bg_error_.
        //
        // Note that existing logic was not sound. Any partial failure writing
        // into the memtable would result in a state that some write ops might
        // have succeeded in memtable but Status reports error for all writes.

        SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
      }

    //收尾操作
      PERF_TIMER_START(write_pre_and_post_process_time);
      if (updates == &tmp_batch_) {
        tmp_batch_.Clear();
      }
      mutex_.Lock();

      // internal stats
      default_cf_internal_stats_->AddDBStats(
          InternalStats::BYTES_WRITTEN, batch_size);
      default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN,
                                             my_batch_count);
      if (!write_options.disableWAL) {
        if (write_options.sync) {
          default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED,
                                                 1);
        }
        default_cf_internal_stats_->AddDBStats(
            InternalStats::WAL_FILE_BYTES, log_size);
      }
      if (status.ok()) {
        versions_->SetLastSequence(last_sequence);
      }
  } else {
    // Operation failed.  Make sure sure mutex is held for cleanup code below.
    mutex_.Lock();
  }

  if (db_options_.paranoid_checks && !status.ok() && !callback_failed &&
      !status.IsBusy() && bg_error_.ok()) {
    bg_error_ = status; // stop compaction & fail any further writes
  }

  mutex_.AssertHeld();

  if (need_log_sync) {
    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
  }

  uint64_t writes_for_other = write_batch_group.size() - 1;
  if (writes_for_other > 0) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
                                           writes_for_other);
    if (!write_options.disableWAL) {
      default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL,
                                             writes_for_other);
    }
  }

  mutex_.Unlock();

  write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);

  return status;
}

参考资料

http://blog.csdn.net/wang_xijue/article/details/46521605
http://kernelmaker.github.io/Rocksdb_Study_4

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rocksdb 代码学习 写流程1(WriteBatch写,WriterThead调度Writer) 的相关文章

  • rocksdb的设置选项和基本调整

    除了在 RocksDB 上使用基本操作编写代码外 xff0c 您可能还对如何调整 RocksDB 以实现所需的性能感兴趣 在本页中 xff0c 我们将介绍如何进行初始设置 xff0c 该设置应该足以满足许多用例的需求 RocksDB 有许多
  • rocksdb的wal配置

    wal配置 数据库 xff1a xff1a wal dir DBOptions wal dir设置 RocksDB 存储预写日志文件的目录 xff0c 该目录允许将 WAL 存储在与实际数据不同的目录中 数据库 xff1a xff1a WA
  • 【数据库】Redis和RocksDB、levelDB的区别

    区别 Redis 是一个服务 xff0c 独立的进程 xff0c 用户的程序需要与它建立连接才能向它发请求 xff0c 读写数据 RocksDB 和LevelDB 是一个库 xff0c 嵌入在用户的程序中 xff0c 用户程序直接调用接口读
  • ubuntu2004/1804安装编译RocksDB

    Linux Ubuntu下载依赖 Upgrade your gcc to version at least 4 8 to get C 11 support Install gflags First try sudo apt get inst
  • Ubuntu搭建开发openchannelssd的qemu的虚拟机

    1 安装带有NVMe支持的qemu QEMU Installation QEMU support for Open Channel SSDs is based on top of Keith Busch s qemu nvme branch
  • RocksDB介绍:一个比LevelDB更彪悍的引擎

    关于LevelDB的资料网上还是比较丰富的 如果你尚未听说过LevelDB 那请稍微预习一下 因为RocksDB实际上是在LevelDB之上做的改进 本文主要侧重在架构上对RocksDB对LevelDB改进的地方做个简单介绍并添加一些个人的
  • 【RocksDB】Ubuntu18.04下编译rocksdb

    最近的新项目是NewSQL 底层存储引擎是rocksdb 于是在Ubuntu下编译了一下 下面是编译过程 首先安装依赖的包以及组件 安装gcc g 及make sudo apt get install build essential 安装g
  • 【RocksDB】Ubuntu20.04下编译rocksdb

    前言 我在刚学rocksdb的时候是在2022年 但是网上的资源很少 查了好久才把rocksdb安装成功 在这里向大家分享一下我的经历 安装过程中也报了很多错误 希望大家不要迷路 首先 在虚拟机里面安装依赖的包以及组件 总共七个依赖包和组件
  • MySQL · 特性分析 · MyRocks简介

    http mysql taobao org monthly 2016 08 03 RocksDB是facebook基于LevelDB实现的 目前为facebook内部大量业务提供服务 经过facebook大量工作 将RocksDB作为MyS
  • 读写锁 share_mutex

    实现一个Windows下的共享锁 读写锁 一 作者 tyc611 cublog cn 2008 11 18 在Windows Vista Server 2008之前 Windows没有提供共享锁 通俗称为读写锁 只能靠自己实现 但从Wind
  • CODIS原理 之 数据迁移流程[2.X]

    CODIS原理 之 数据迁移流程 2 X 分类 源码剖析设计思路 1173 0 作者 邹祁峰 邮箱 Qifeng zou job hotmail com 博客 http blog csdn net qifengzou 日期 2016 08
  • Rocksdb 代码学习 写流程1(WriteBatch写,WriterThead调度Writer)

    1 几个需要使用的相关类 1 Slice 主要用来装数据的 就两个成员变量data size 就是用装key和value的值 长度 以及一些处理函数 class Slice public Create an empty slice Slic
  • MariaDB 10.2 和 Openssl 1.1.0e 出现“不完整类型 MD5_CONTEXT”错误

    我无法在 CentOS 7 中构建启用 RocksDB 的 MariaDB 10 2 它有以下编译错误 root mariadb 10 2 mysys ssl my md5 cc In function void md5 result MD
  • Kafka Streams:如何使用 persistenceKeyValueStore 从磁盘重新加载现有消息?

    我的代码当前使用 InMemoryKeyValueStore 这避免了对磁盘或 kafka 的任何持久化 我想使用rocksdb Stores persistentKeyValueStore 以便应用程序将从磁盘重新加载状态 我正在尝试实现
  • Rocksdb.errors.RocksIOError:IO错误:锁定文件时:sample.db/LOCK:资源暂时不可用

    如何删除rocksDB上的锁 我尝试运行以下代码但出现以下错误 Running on http 127 0 0 1 5000 Press CTRL C to quit Restarting with stat Traceback most
  • Apache Flink 检查点卡住

    我们正在运行一个 ListState 介于 300GB 到 400GB 之间的作业 并且有时该列表可能会增加到数千 在我们的用例中 每个项目都必须有自己的 TTL 因此我们使用 S3 上的 RocksDB 后端为此 ListState 的每
  • Kafka Streams - 低级处理器 API - RocksDB TimeToLive(TTL)

    我正在尝试使用低级处理器 API 我正在使用处理器 API 对传入记录进行数据聚合 并将聚合记录写入 RocksDB 但是 我想保留在rocksdb中添加的记录仅在24小时内处于活动状态 24 小时后 记录应被删除 这可以通过更改 ttl
  • RocksDb sst 文件的 GUI 查看器

    我正在与 Kafka 合作 将数据保存到rocksdb 中 现在我想看看 Kafka 创建的数据库键和值 我下载了 FastNoSQL 并尝试但失败了 该文件夹包含 sst 文件 日志文件 当前文件 身份文件 锁定文件 日志文件 清单文件
  • ROCKSDB 由于rocksdb_max_row_locks 无法获取锁

    我尝试将 CSV 加载到 Rocksdb 数据库中 但失败并显示以下错误 Got error 10 Operation aborted Failed to acquire lock due to rocksdb max row locks
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件

随机推荐

  • Educoder---Java面向对象 - 集合框架(1)

    第一题 请仔细阅读右侧代码 根据方法内的提示 在Begin End区域内进行代码补充 创建ArrayList集合并且向集合中添加数据 具体要求如下 添加字符串类型数据 https www educoder net 添加double类型数据
  • 偏移注入payload构造技巧实战+Access注入

    url http 218 245 4 113 8888 web03 ca55022fa7ae5c29d179041883fe1556 index asp id 886 拿到url 虽然知道肯定是id是注入点 但还是写一下完整思路 1 拿到界
  • node环境实现console输出不同颜色

    一 输出规则分析 1 输出及打印如下 console log x1B 31m s x1B 0m 这是红色 console log x1B 36m s x1B 0m 这是青色 2 规则说明 x1B 31m 是一个转义序列 它将被您的终端拦截并
  • 【翻译】Dart和Flutter是什么?

    Dart是在Go之后从谷歌出现的 最近作为Flutter跨平台前端框架背后的语言 其受欢迎程度激增 这对那些对云原生基础设施感兴趣的人来说很重要 因为有一种对 全栈Dart 的推动 Flutter开发者可以使用相同的语言来构建他们应用程序背
  • python文件打开的合法模式组合wr_使用Python来操作你的路由器(TP_LINK WR885N)

    开始之前咱们先了解一下TPLINK WR885N这款设备 官方地址为 http www tp link com cn product 368 html 针对官方介绍 这里博主做个简短的讲解 首先看到的是官方的路由器图片 外观还是不错的 博主
  • ArrayList源码解析(一)

    以下分析均以jdk1 8为准 首先来看一下ArrayList的继承体系 ArrayList继承自AbstractList 实现了 List Cloneable Serializable RandomAccess接口 这一点从源码上也可以看到
  • 达梦数据库教程:DM8数据迁移工具使用教程(oracle迁移至DM8)

    DM 数据迁移工具 DM 数据迁移工具 DM DTS 提供了主流大型数据库迁移到 DM DM 到 DM 文件迁移到 DM 以及 DM 迁移到文件等功能 得益于 DM 数据库对目前主流大型关系型数据库系统有着业界领先的兼容性 在存储层面 语法
  • 机器学习——seaborn可视化

    主要记录seaborn可视化学习笔记 明白有哪些绘制图像的函数可用 文章目录 一 seaborn原理 二 变量分布 1 sns boxplot 查看数值变量的取值范围 2 sns displot 查看变量的分布 3 sns jointplo
  • Windows Server存储空间配置及文件服务器的搭建

    Windows Server存储空间配置及文件服务器的搭建 技术参考 存储空间配置及文件服务器的搭建 pdf 一 网络拓扑图 环境准备 基础环境级上次实验结束后环境 上次实验 这里的共享存储就使用DC服务器 基础环境的准备 文件服务器 DC
  • BinaryWatch[LeetCode]

    class Solution public vector
  • Python 直接赋值、浅拷贝和深度拷贝解析

    Python 直接赋值 浅拷贝和深度拷贝解析 一 直接赋值 直接赋值其实就是对象的引用 别名 比如 b a 把a的值赋给b b就相当于一个别名 其实a和b都是指向的同一对象 下图很清晰地说明了直接赋值的含义 二 浅拷贝 copy 与 深度拷
  • 2023年1月计划(fbo+qedl)

    根据规划 1月计划如下 剩下还有20天 主要把fbo搞定就行了 也把qedl抄几遍 当然 能移植到osg中更好 看看glsl和osg相关的视频教程和书籍 ue4和socket可以在周末学学
  • 机器人 串口配置文件serial.INI

    SERIAL INI Configuration of the serial ports and their protocols Lindemann 27 02 2002 KUKA Controls update comments and
  • MFC-编写JAVA环境变量配置发现的问题

    配置JAVA环境变量实际上是要修改系统的环境变量 MSDN中有许多修改环境变量的函数 调用比如SetEnvironmentVariable 这样的环境变量设置函数 都是无法修改系统的环境变量的 只能改变本进程的环境变量 要修改系统环境变量只
  • Blender小图标栏不见

    将鼠标放置于属性面板左上角的 gt 箭头上 按住鼠标左键 向右拉 可能会操作到两个窗口边界 需要多操作几次
  • 并 发 请 求

    如果一次性 就加载100个请求 肯定会造成服务器压力 所以有时候 需要 较少请求 来减轻服务器压力 代码如下 const urls for let i 0 i lt 100 i urls push http www bai com i fu
  • css flex布局 —— 容器属性 flex-wrap

    flex wrap属性 默认情况下 项目都排在一条线 又称 轴线 上 flex wrap属性定义 如果一条轴线排不下 如何换行 语法 box flex wrap nowrap wrap wrap reverse flex wrap 取值有三
  • Eureka与Zookeeper的区别

    著名的CAP 理论指出 一个分布式系统不可能同时满足 C 一致性 A 可用性 和 P 分区容错性 由于分区容错性在是分布式系统中必须要保证的 因此我们只能在 A 和 C 之间进行权衡 在此 Zookeeper 保证的是 CP 而 Eurek
  • mysql数据库中的索引有那些、有什么用

    转载http www 2cto com database 201212 173288 html mysql 数据库中的索引有那些 有什么用 本文主要讲述了如何加速动态网站的MySQL索引分析和优化 www 2cto com 一 什么是索引
  • Rocksdb 代码学习 写流程1(WriteBatch写,WriterThead调度Writer)

    1 几个需要使用的相关类 1 Slice 主要用来装数据的 就两个成员变量data size 就是用装key和value的值 长度 以及一些处理函数 class Slice public Create an empty slice Slic