Apache IoTDB 查询引擎源码阅读——数据异步传输模块

2023-05-16

本文参考了 Apache IoTDB 社区成员田原和王中的设计文档,由于飞书链接限制,本文没有贴出参考链接。

背景

Apache IoTDB 查询引擎目前采用 MPP 架构,一条查询 SQL 大致会经历下图几个阶段:

FragmentInstance 是分布式计划被拆分后实际分发到各个节点进行执行的实例。这些被拆分出的 FragmentInstance 逻辑上仍然构成一个树形结构,父亲结点需要子结点的输出作为输入(即下游 FragmentInstance 需要接收上游 FragmentInstance 的输出作为输入)来完成相应的逻辑。

由于 FragmentInstance 可能被分发到不同节点,数据传输需要进行网络通信并且具有依赖关系,需要对 FragmentInstance 之间的数据传输进行管理,因此引入了数据异步传输模块。该模块可以看成是火山模型中 ExchangeOperator 的一种实现方式,使用了生产者消费者模型。

重要概念

  • ISinkHandle:通常每一个 FragmentInstance 持有一个 ISinkHandle,用于向上游 FragmentInstance 异步传输计算结果。

  • ExchangeOperator:FragmentInstance 间数据传输逻辑和 FragmentInstance 的执行逻辑是解耦的,FragmentInstance 的算子树结点可能存在 ExchangeOperator,ExchangeOperator 持有 ISourceHandle,可以从上游获取输入。

  • ISourceHandle:与 ISinkHandle 一一对应,接收 ISinkHandle 的计算结果,传给 ExchangeOperator。

  • MPPDataExchangeManager:当前节点数据传输模块的管理中心,持有线程资源,是 ISinkHandle 和 ISourceHandle 交互的中间站。

具体实现

MPPDataExchangeManager

MppDataExchangeManager 是当前节点数据传输模块的管理中心,有下述职责:

  • 负责创建 ISinkHandle 和 ISourceHandle。FragmentInstance 需要通过 MPPDataExchangeManager 创建 ISinkHandle 和 ISourceHandle,MPPDataExchangeManager 维护了两个 Map。

// FragmentInstance 可能有多个 ExchangeOperator,进而有多个 ISourceHandle
// 因此这里的 Map 是一个两层 Map,即 FragmentInstance -> PlanNodeID -> ISourceHandle
private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;

// FragmentInstance -> SinkHandle
private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
  • 定义了 SinkHandleListener 和 SourceHandleListener。ISinkHandle 和 ISourceHandle 定义了 abort(), close() 等方法, SinkHandleListener 和 SourceHandleListener 会在这些方法里被使用,用于通知相应 FragmentInstance 以及更新前述两个 Map。

  • 实现了 MPPDataExchangeService.Iface。不同节点间通过 Thrift RPC 通信,MPPDataExchangeService.Iface 定义了 SinkHandle 和 SourceHandle 的交互接口,接口具体逻辑将在下文分析。

SinkHandle 和 SourceHandle

SinkHandle 和 SourceHandle 是 ISinkHandle 和 ISourceHandle 的一组实现类,用于不同节点间 FragmentInstance 的数据通信。

SinkHandle 和 SourceHandle 的数据通信主要分为三步:

  1. 每产生一个 TsBlock,SinkHandle 向 SourceHandle 发送一个 NewDataBlockEvent,包含该 TsBlock 的sequenceId 以及所占内存大小(如果再无新的数据产生,则发送一个EndOfDataBlockEvent)。在接收到 SourceHandle 对该 TsBlock 的 ack 之前,保存该 TsBlock。

  1. SourceHandle 收到 NewDataBlockEvent后,在内存中选取一段连续区间的 sequenceId,向 SinkHandle 发起拉取数据的请求。

  1. SourceHandle 拉取到数据后,向 SinkHandle 发送 ack 消息,SinkHandle 收到 ack 消息后,便可以将对应的TsBlock 释放。

首先来看 SinkHandle 发送数据的逻辑(只有 SinkHandle 的 isFull() 返回的 Future 被 complete 后,send 方法才会被调用,具体可以参考 Driver#processInternal):

@Override
public synchronized void send(TsBlock tsBlock) {
  long startTime = System.nanoTime();
  try {
    Validate.notNull(tsBlock, "tsBlocks is null");
    checkState();
    if (!blocked.isDone()) {
      throw new IllegalStateException("Sink handle is blocked.");
    }
    if (noMoreTsBlocks) {
      return;
    }
    long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
    int startSequenceId;
    startSequenceId = nextSequenceId;
    blocked =
        localMemoryManager
            .getQueryPool()
            .reserve(
                localFragmentInstanceId.getQueryId(),
                localFragmentInstanceId.getInstanceId(),
                localPlanNodeId,
                retainedSizeInBytes,
                maxBytesCanReserve)
            .left;
    bufferRetainedSizeInBytes += retainedSizeInBytes;

    sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
    nextSequenceId += 1;
    currentTsBlockSize = retainedSizeInBytes;

    // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
    submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
  } finally {
    QUERY_METRICS.recordDataExchangeCost(
        SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
  }
}
  • SinkHandle 在初始化的时候就会向内存池申请内存,此时会初始化 blocked 这个 Future。

  • 进入 send 方法说明 blocked.isDone() == true,send 并不会直接发送 TsBlock,而是发送 NewDataBlockEventTask,SourceHandle 后续会通过 sequenceId 拉取指定的 TsBlock。

  • send() 用这次发送的 TsBlock 的大小来估计下一次要发送的 TsBlock 的大小,所以 16 -25 行更新 blocked 时使用的是当前 TsBlock 的 retainedSizeInBytes。

下面来看 SourceHandle 拉取 TsBlock 的逻辑,可以直接参考注释:

private synchronized void trySubmitGetDataBlocksTask() {
  if (aborted || closed) {
    return;
  }
  if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
    return;
  }

  final int startSequenceId = nextSequenceId;
  int endSequenceId = nextSequenceId;
  long reservedBytes = 0L;
  Pair<ListenableFuture<Void>, Boolean> pair = null;
  long blockedSize = 0L;
  
  // 选取一段连续的 sequenceId
  while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
    Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
    if (bytesToReserve == null) {
      throw new IllegalStateException("Data block size is null.");
    }
    // 从内存池申请内存
    pair =
        localMemoryManager
            .getQueryPool()
            .reserve(
                localFragmentInstanceId.getQueryId(),
                localFragmentInstanceId.getInstanceId(),
                localPlanNodeId,
                bytesToReserve,
                maxBytesCanReserve);
    bufferRetainedSizeInBytes += bytesToReserve;
    endSequenceId += 1;
    reservedBytes += bytesToReserve;
    // 没有申请到内存,跳出循环
    if (!pair.right) {
      blockedSize = bytesToReserve;
      break;
    }
  }

  if (pair == null) {
    // Next data block not generated yet. Do nothing.
    return;
  }
  nextSequenceId = endSequenceId;

  // 注册回调函数,在申请内存的 future 被 complete(表明内存被申请到了)时拉取指定 sequenceId 的 TsBlock
  if (!pair.right) {
    endSequenceId--;
    reservedBytes -= blockedSize;
    // The future being not completed indicates,
    //   1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
    //   2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
    //   3. Have not reserve memory for the rest of blocks.
    //
    //  startSequenceId          endSequenceId - 1  endSequenceId
    //         |-------- reserved --------|--- blocked ---|--- not reserved ---|

    // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
    blockedOnMemory = pair.left;
    final int blockedSequenceId = endSequenceId;
    final long blockedRetainedSize = blockedSize;
    blockedOnMemory.addListener(
        () ->
            executorService.submit(
                new GetDataBlocksTask(
                    blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)),
        executorService);
  }

  if (endSequenceId > startSequenceId) {
    executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
  }
}

LocalSinkHandle 和 LocalSourceHandle

LocalSinkHandle 和 LocalSourceHandle 是 ISinkHandle 和 ISourceHandle 的另一组实现类,用于同一节点不同 FragmentInstance 的数据通信。不复用 SinkHandle 和 SourceHandle 是因为同一节点没必要再使用 RPC 通信,可以节省网络开销。

LocalSinkHandle 和 LocalSourceHandle 通过一个共享的阻塞队列 SharedTsBlockQueue 进行通信。

LocalSinkHandle 的发送逻辑(只有 LocalSinkHandle 的 isFull() 返回的 Future 被 complete 后才会发送,直接往 queue 里放 TsBlock):

@Override
public void send(TsBlock tsBlock) {
  long startTime = System.nanoTime();
  try {
    Validate.notNull(tsBlock, "tsBlocks is null");
    synchronized (this) {
      checkState();
      if (!blocked.isDone()) {
        throw new IllegalStateException("Sink handle is blocked.");
      }
    }

    synchronized (queue) {
      if (queue.hasNoMoreTsBlocks()) {
        return;
      }
      logger.debug("[StartSendTsBlockOnLocal]");
      synchronized (this) {
        blocked = queue.add(tsBlock);
      }
    }
  } finally {
    QUERY_METRICS.recordDataExchangeCost(
        SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
  }
}

LocalSourceHandle 的拉取逻辑(只有 isBlocked() 返回的 Future complete 时才会被调用):

@Override
public TsBlock receive() {
  long startTime = System.nanoTime();
  try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
    checkState();

    if (!queue.isBlocked().isDone()) {
      throw new IllegalStateException("Source handle is blocked.");
    }
    TsBlock tsBlock;
    synchronized (queue) {
      tsBlock = queue.remove();
    }
    if (tsBlock != null) {
      logger.debug(
          "[GetTsBlockFromQueue] TsBlock:{} size:{}",
          currSequenceId,
          tsBlock.getRetainedSizeInBytes());
      currSequenceId++;
    }
    checkAndInvokeOnFinished();
    return tsBlock;
  } finally {
    QUERY_METRICS.recordDataExchangeCost(
        SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
  }
}

LocalSinkHandle 的 send 方法和 LocalSourceHandle 的 receive 方法实现都较为简单,主要通过 SharedTsBlockQueue 进行交互,下面是 SharedTsBlockQueue 的 remove 和 add 方法:

/**
 * Remove a tsblock from the head of the queue and return. Should be invoked only when the future
 * returned by {@link #isBlocked()} completes.
 */
public TsBlock remove() {
  if (closed) {
    throw new IllegalStateException("queue has been destroyed");
  }
  TsBlock tsBlock = queue.remove();
  // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
  // corresponding LocalSinkHandle.
  if (sinkHandle != null) {
    sinkHandle.checkAndInvokeOnFinished();
  }
  
  // 释放当前 TsBlock 在 MemoryPool 中占用的内存
  localMemoryManager
      .getQueryPool()
      .free(
          localFragmentInstanceId.getQueryId(),
          localFragmentInstanceId.getInstanceId(),
          localPlanNodeId,
          tsBlock.getRetainedSizeInBytes());
  bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
  if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
    blocked = SettableFuture.create();
  }
  return tsBlock;
}

/**
 * Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
 * the returned future of last invocation completes.
 */
public ListenableFuture<Void> add(TsBlock tsBlock) {
  if (closed) {
    logger.warn("queue has been destroyed");
    return immediateVoidFuture();
  }

  Validate.notNull(tsBlock, "TsBlock cannot be null");
  Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
  Pair<ListenableFuture<Void>, Boolean> pair =
      localMemoryManager
          .getQueryPool()
          .reserve(
              localFragmentInstanceId.getQueryId(),
              localFragmentInstanceId.getInstanceId(),
              localPlanNodeId,
              tsBlock.getRetainedSizeInBytes(),
              maxBytesCanReserve);
  blockedOnMemory = pair.left;
  bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();

  // reserve memory failed, we should wait until there is enough memory
  if (!pair.right) {
    blockedOnMemory.addListener(
        () -> {
          synchronized (this) {
            queue.add(tsBlock);
            if (!blocked.isDone()) {
              blocked.set(null);
            }
          }
        },
        directExecutor());
  } else { // reserve memory succeeded, add the TsBlock directly
    queue.add(tsBlock);
    if (!blocked.isDone()) {
      blocked.set(null);
    }
  }

  return blockedOnMemory;

MemoryPool

由于采用异步传输机制,SinkHandle 在实际发送数据前需先将计算好的 TsBlock 保留在内存中,SourceHandle 在接收 TsBlock 前也需要先预留内存,为了对数据传输模块占用的内存进行管理,SinkHandle 和 SourceHandle 需要通过 MemoryPool 申请内存。

每个节点持有一个 MemoryPool,大小由配置参数决定:

public class LocalMemoryManager {

  private final MemoryPool queryPool;

  public LocalMemoryManager() {
    queryPool =
        new MemoryPool(
            "query",
            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(),
            IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
  }

  public MemoryPool getQueryPool() {
    return queryPool;
  }
public MemoryPool(String id, long maxBytes, long maxBytesPerFragmentInstance) {
  this.id = Validate.notNull(id);
  Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: %d", maxBytes);
  // maxBytes 是整个 pool 最大可以使用的内存容量
  this.maxBytes = maxBytes;
  Validate.isTrue(
      maxBytesPerFragmentInstance > 0L && maxBytesPerFragmentInstance <= maxBytes,
      "max bytes per query should be greater than zero while less than or equal to max bytes. maxBytesPerQuery: %d, maxBytes: %d",
      maxBytesPerFragmentInstance,
      maxBytes);
  // maxBytesPerFragmentInstance 是单个 FragmentInstance 的 ISinkHandle 和 ISourceHandle
  // 占用内存之和的最大值
  this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
}

ISinkHandle 和 ISourceHandle 通过 MemoryPool 的 reserve 方法申请内存,reserve 在判断是否申请成功时进行两层判断:

  • 首先判断申请的内存会不会超过 MemoryPool 最大限制,maxBytes - reservedBytes < bytesToReserve 表明超过限制。

  • 每一个 ISinkHandle/ISourceHandle 能申请的内存也有限制,第二层判断申请的内存会不会超过调用 reserve 方法的 ISinkHandle/ISourceHandle 的限制,即 27 - 32 行逻辑。

如果申请成功,则更新 MemoryPool 已使用的内存以及该 ISinkHandle/ISourceHandle 占用的内存(更新 queryMemoryReservations),然后返回 Futures.immediateFuture(null);

如果申请失败,则创建一个 MemoryReservationFuture,加入维持的 list,当调用 MemoryPool#free 释放内存的时候,会选取 list 中的 future 进行 complete。

下面是 reserve 方法的源码:

/** @return if reserve succeed, pair.right will be true, otherwise false */
public Pair<ListenableFuture<Void>, Boolean> reserve(
    String queryId,
    String fragmentInstanceId,
    String planNodeId,
    long bytesToReserve,
    long maxBytesCanReserve) {
  Validate.notNull(queryId);
  Validate.notNull(fragmentInstanceId);
  Validate.notNull(planNodeId);
  Validate.isTrue(
      bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
      "bytes should be greater than zero while less than or equal to max bytes per fragment instance: %d",
      bytesToReserve);
  if (bytesToReserve > maxBytesCanReserve) {
    LOGGER.warn(
        "Cannot reserve {} bytes memory from MemoryPool for planNodeId{}",
        bytesToReserve,
        planNodeId);
    throw new IllegalArgumentException(
        "Query is aborted since it requests more memory than can be allocated.");
  }

  ListenableFuture<Void> result;
  synchronized (this) {
    if (maxBytes - reservedBytes < bytesToReserve
        || maxBytesCanReserve
                - queryMemoryReservations
                    .getOrDefault(queryId, Collections.emptyMap())
                    .getOrDefault(fragmentInstanceId, Collections.emptyMap())
                    .getOrDefault(planNodeId, 0L)
            < bytesToReserve) {
      LOGGER.debug(
          "Blocked reserve request: {} bytes memory for planNodeId{}",
          bytesToReserve,
          planNodeId);
      result =
          MemoryReservationFuture.create(
              queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
      memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
      return new Pair<>(result, Boolean.FALSE);
    } else {
      reservedBytes += bytesToReserve;
      queryMemoryReservations
          .computeIfAbsent(queryId, x -> new HashMap<>())
          .computeIfAbsent(fragmentInstanceId, x -> new HashMap<>())
          .merge(planNodeId, bytesToReserve, Long::sum);
      result = Futures.immediateFuture(null);
      return new Pair<>(result, Boolean.TRUE);
    }
  }
}

调用 MemoryPool#free 时,首先会更新 MemoryPool 占用的内存和 ISinkHanlde/ISourceHandle 占用的内存。

然后会遍历 memoryReservationFutures 查看可以 complete 的 Future:

public void free(String queryId, String fragmentInstanceId, String planNodeId, long bytes) {
  List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
  synchronized (this) {
    Validate.notNull(queryId);
    Validate.isTrue(bytes > 0L);

    Long queryReservedBytes =
        queryMemoryReservations
            .getOrDefault(queryId, Collections.emptyMap())
            .getOrDefault(fragmentInstanceId, Collections.emptyMap())
            .get(planNodeId);
    Validate.notNull(queryReservedBytes);
    Validate.isTrue(bytes <= queryReservedBytes);

    queryReservedBytes -= bytes;
    if (queryReservedBytes == 0) {
      queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
    } else {
      queryMemoryReservations
          .get(queryId)
          .get(fragmentInstanceId)
          .put(planNodeId, queryReservedBytes);
    }
    reservedBytes -= bytes;

    if (memoryReservationFutures.isEmpty()) {
      return;
    }
    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
    while (iterator.hasNext()) {
      MemoryReservationFuture<Void> future = iterator.next();
      if (future.isCancelled() || future.isDone()) {
        continue;
      }
      long bytesToReserve = future.getBytesToReserve();
      String curQueryId = future.getQueryId();
      String curFragmentInstanceId = future.getFragmentInstanceId();
      String curPlanNodeId = future.getPlanNodeId();
      // check total reserved bytes in memory pool
      if (maxBytes - reservedBytes < bytesToReserve) {
        continue;
      }
      // check total reserved bytes of one Sink/Source handle
      if (future.getMaxBytesCanReserve()
              - queryMemoryReservations
                  .getOrDefault(curQueryId, Collections.emptyMap())
                  .getOrDefault(curFragmentInstanceId, Collections.emptyMap())
                  .getOrDefault(curPlanNodeId, 0L)
          >= bytesToReserve) {
        reservedBytes += bytesToReserve;
        queryMemoryReservations
            .computeIfAbsent(curQueryId, x -> new HashMap<>())
            .computeIfAbsent(curFragmentInstanceId, x -> new HashMap<>())
            .merge(curPlanNodeId, bytesToReserve, Long::sum);
        futureList.add(future);
        iterator.remove();
      }
    }
  }

  // why we need to put this outside MemoryPool's lock?
  // If we put this block inside the MemoryPool's lock, we will get deadlock case like the
  // following:
  // Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
  // MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
  // B-SharedTsBlockQueue's lock
  // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
  // B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
  for (MemoryReservationFuture<Void> future : futureList) {
    try {
      future.set(null);
    } catch (Throwable t) {
      // ignore it, because we still need to notify other future
      LOGGER.error("error happened while trying to free memory: ", t);
    }
  }
}

总结

上述流程存在几点问题:

  1. SourceHandle 无法对事件到达做任务顺序的假设,导致 SourceHandle 的编写有些复杂,需要考虑事件乱序到达的情况。

  1. 假设 NewDataBlockEvent 事件顺序到达,且 SourceHandle 的消费速度与 SinkHandle 的生成速度一致,则传输一个 TsBlock,需3次 RPC,网络开销较大:

  1. SinkHandle 发送 NewDataBlockEvent

  1. SourceHandle 拉取 NewDataBlockEvent 对应 sequenceId的 TsBlock

  1. SourceHandle 成功拉取到 TsBlock 后,发送 ack 消息

传输一个 TsBlock 需要三次 rpc 通信的设计初衷:

  • 控制数据传输线程数量,并且不让一个 SourceHandle 占据一个线程过久,以致其他 SourceHandle 无法发起数据传输请求:

  • 因为 thrift rpc 是同步的,所以如果没有 NewDataBlockEvent,SourceHandle 盲目去拉取数据,那么可能SinkHandle 端数据还未准备好。如果阻塞等待 SinkHandle 产生数据后,返回此次 rpc 结果,会导致一个SourceHandle 占据一个数据传输线程过久,在数据传输线程总数一定的情况下,其他 SourceHandle 的请求会得不到及时处理。

  • 所以需要依赖 SinkHandle 的 NewDataBlockEvent 通知,在 SinkHandle 数据准备好的时候,发送一个 rpc 去通知 SourceHandle 拉取数据。此时 SourceHandle 拉取数据的 rpc 虽然也是同步的,但是 SinkHandle 端的数据一定是准备好的,所以该 rpc 同步阻塞占用数据传输线程的时间很短,不会导致 SourceHandle的请求过多等待

  • 防止 TsBlock 丢失,启用 ack 消息去做容错

  • 如果 SinkHandle 在收到 SourceHandle 拉取请求的 rpc 后,就将对应的 TsBlock 释放,那么会存在数据丢失的风险:网络问题导致此次 rpc 失败,虽然 SourceHandle 那边会重试,但是 SinkHandle 在处理上次 rpc 请求时,已经把对应的 TsBlock 释放掉了,导致 SourceHandle 的重试也是徒劳无功。

  • 内存控制实现较为简单,因为在拉取数据之前就已经得知每个 TsBlock 的大小,所以可以判断 SourceHandle 端内存是否足够,足够了才去拉取,无需提前预分配内存给查询,每次即时申请即可。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache IoTDB 查询引擎源码阅读——数据异步传输模块 的相关文章

随机推荐

  • 【Matlab】建立最优控制LQR控制器模型

    前言 之前做了有关于系统辨识以及配置极点来控制系统响应的相关内容 xff0c 那些相当于是打开了一个新世界的大门 xff0c 在此基础上 xff0c 一直想学习一下最优控制的相关内容 xff0c 并应用在项目上 xff0c 因此 xff0c
  • 人工智能算法公式中常见的数据符号的定义和解释?

    长期更新中 没有记录的希望大家留言补充 对数 xff1a xff08 log xff0c lg xff0c ln xff0c lb xff09 log log4 xff08 8 xff09 61 log4 xff08 4 2 xff09 6
  • 我的2014

    弹指间2014过去了 xff0c 在过去的一年里 xff0c 或许你收获了成功 xff0c 取得了令人瞩目的成绩 又或许你失意落魄 xff0c 躲在角落了自舔伤口 但这些都不重要 xff0c 重要的是今年是2015不是2014 新的一年里有
  • 51单片机实现串口偶校验

    1 STC单片机串口 2 PSW是一个8位寄存器 PSW的全称是Program Status Word xff0c 即程序状态字 奇偶标志位P 每执行一条汇编指令 xff0c 单片机都能根据A中1的个数的奇偶自动令P置位或清零 xff0c
  • SLAM十四讲第二版ch7位姿估计实践的编译问题

    在看ch7中遇到了g2o OptimizableGraph Vertex clone const 未定义的引用的问题 为了方便以后复现 xff0c 记录如下 我的环境配置 Pangolin 0 5 Opencv 3 4 16 cere 1
  • SLAM十四讲第二版ch6的未定义的引用问题

    在使用slam十四讲第二版做题的时候发现 xff0c ch6编译问题 xff0c 特此记录如下 make时遇到问题如下 xff1a 对 g2o OptimizableGraph Vertex clone const 未定义的引用 对 g2o
  • Ubuntu18.04配置ORB_SLAM3(ROS)

    一 安装ROS 建议按照官网操作 cn melodic Installation Ubuntu ROS Wiki http wiki ros org cn melodic Installation Ubuntu 二 安装eigen3 3 7
  • Semantic Visual Simultaneous Localization and Mapping: A Survey阅读笔记

    Abstract xff1a 通过语义和vslam结合可以很好解决动态和复杂环境中良好定位 首先回顾了语义vslam发展 xff0c 关注优势和差异 其次探讨了 xff1a 语义信息提取和关联 语义的应用和语义的优势然后收集分析了最先进的s
  • git上传到gitee的记录

    一 安装和配置 sudo apt get install git git config global user email 你的邮箱 git config global user name 64 你的昵称 二 上传 先在github或者gi
  • Burst Imaging for Light-Constrained Structure-From-Motion论文翻译记录

    准备开始2022ICRA的SLAM论文阅读记录 Abstract 在极低光照条件下拍摄的图像受噪声限制 xff0c 会导致现有的机器人视觉算法失效 在本文中 xff0c 我们开发了一种图像处理技术 xff0c 用于从弱光条件下采集的图像中辅
  • ROS常用指令

    非代码 一 创建工作空间 mkdir p catkin ws src cd catkin ws src catkin init workspace 二 创建功能包 cd workspace src catin create pkg name
  • 【Matlab】线性跟踪微分器

    线性跟踪微分器介绍 xff1a 线性跟踪微分器出自自抗扰控制ADRC xff0c 线性跟踪微分器有两个作用 xff0c 一是可以用来滤波 xff0c 而是可以用来求取输入的微分 这里有一篇文章可以推荐看看 xff1a https blog
  • FreeRTOS启动流程

    Reset Handler xff1a 芯片上电默认进Reset Handler SystemInit xff1a 初始化时钟及中断向量映射 main xff1a main函数入口 main xTaskCreate xff1a pvPort
  • 基于STM32F411使用SPI+DMA驱动LCD

    先看效果 F411CE 硬件SPI xff0b DMA驱动LCD 基于HAL库 其实HAL库我用不太习惯 xff0c 一直也是用的标准库 但HAL库确实是好上手一些 xff0c 就迅速创建一个新的template 这次就当尝试一下吧 xff
  • 动手深度学习-环境配置(手动安装,一步一步教你,有截图可看)

    一 xff1a 官网教程 这一部分对应了书中的第二讲 xff1a 预备知识部分 因为我是Windows用户 xff0c 所以这里先只讲Windows部分的安装过程 1 xff1a 第一步是根据操作系统下载并安装Miniconda xff0c
  • 机器学习算法原理与实践(三)、卡尔曼滤波器算法浅析及matlab实战

    协方差矩阵 状态协方差矩阵传递 状态协方差的更新 Matlab 实现 Matlab效果 测试代码 测试效果 原创 Liu LongPo 转载请注明出处 CSDN http blog csdn net llp1992 卡尔曼滤波器是一种利用线
  • Pixhawk之获取传感器数据并更新姿态

    博主 xff1a UAV 声明 xff1a 尊重版权 xff0c 转载请注明出处 原文地址 xff1a 联系方式 xff1a 595493514 64 qq com 技术交流QQ xff1a 595493514 read AHRS 是负责更
  • 类与对象以及类的继承

    类与对象以及类的继承 Java是一门面向对象的语编程言 世界上有众多对象 xff0c 我们把具有相同属性和方法的对象归为一个类 因此 xff0c 类 便是Java代码中的基本单位 下面是对一些名词的解释 类 类是一个模板 xff0c 用来定
  • 如何使用 Apache IoTDB 触发器

    Apache IoTDB 触发器提供了一种侦听序列数据变动的机制 配合用户自定义逻辑 xff0c 可完成告警 数据转发等功能 触发器基于 Java 反射机制实现 用户通过简单实现 Java 接口 xff0c 即可实现数据侦听 IoTDB 允
  • Apache IoTDB 查询引擎源码阅读——数据异步传输模块

    本文参考了 Apache IoTDB 社区成员田原和王中的设计文档 xff0c 由于飞书链接限制 xff0c 本文没有贴出参考链接 背景 Apache IoTDB 查询引擎目前采用 MPP 架构 xff0c 一条查询 SQL 大致会经历下图