- 发现一开始就深入源码,很容易陷进去,特别是模块非常多的情况,需要看很多遍才能理解清楚。
- 要写出更容易理解的文档,需要的不是事无巨细的分析代码,更主要的是能够把复杂的东西抽象出来,变为简单的东西。一个很简答的例子是画函数调用流程图很简单,但是要把流程图转换成框图却很难。
简介
从 ROS 系统说起
Apollo 最初用的中间件是 ROS(机器人操作系统),在 v3.0 之前用的都是基于 ROS 框架进行开发。概括来说,ROS 系统主要包含三方面:
- 第一是通信系统,ROS 是个分布式的松耦合系统,算法模块是以独立的进程形式存在的,也就是我们常说的 Node。ROS 基于 Socket 实现了pub/sub 的通信方式,不同的算法节点(node)之间通过 pub/sub 的发送/接收消息。
- 第二是 Framework&Tools (框架和工具),开发者可以基于 ROS 提供的 Client Library 和通信层,方便的收发消息。开发者只需要关注消息处理相关的算法,而至于算法何时被调用,全部由框架来处理。
- 第三是生态系统,从社区内,开发者可以很方便地寻找到很多现成的「传感器驱动」和「算法实现」等进行参考。
随着自动驾驶的发展,不少开发者,包括 Apollo 平台,把 ROS 应用于自动驾驶系统,毕竟自动驾驶汽车也相当于一个大的机器人。但是我们在实践中也遇到了很多挑战:
- 首先,ROS 中的算法模块是以独立的进程形式存在的,那么这些进程之间应该以什么样的顺序去执行?实际上,Linux 本身是一个通用系统,内核中的调度器对上面的算法业务逻辑并不清楚,它只是在尽量满足公平的情况下让大家都得到调度。所以,ROS Node 运行顺序并无任何逻辑。但本质上自动驾驶是一个专用系统,任务应该按照一定的业务逻辑执行。那么是在 ROS 层加一个 Node,由其来同步各个算法任务的运行,还是在Linux 内核中实现新的调度策略,使其结合算法业务逻辑进行调度?前者的开销,后者的迁移性,都是需要思考的问题。
- 其次,ROS 是一个分布式的系统。既然是分布式,就要有通信的开销。即使在同一个物理节点上,依然存在着通信的开销。所以 Apollo 前期曾经使用共享内存去降低 ROS 原生的基于 Socket 通信的开销。ROS 2 也在使用 DDS 解决通信方面的实时性。ROS 也支持 Nodelet 模式,这可以去掉进程间通信的开销,但是调度的挑战依然存在。
- 第三,除了调度的不确定性,ROS 系统中还存在其他很多不确定的地方,比如内存的动态申请。
随着 Apollo 的发展,对最高水平的稳健性和性能的需求, Apollo Cyber RT 应运而生,它满足了一个面向商业化的自动驾驶解决方案的基础需求。
是什么
- CyberRT是一个开源,高性能的运行行框架,专为自动驾驶场景设计。针对自动驾驶的高并发、低延迟、高吞吐量进行了大幅优化。
- CyberRT是百度推出的替代ROS的消息中间件,自动驾驶中的各个模块通过cyber进行消息订阅和发布,同时cyber还提供了任务调度、录制bag包等功能。Apollo提供了 CyberRT作为中间体,对计算任务和通信进行优化
- CyberRT处于底层的实时操作系统和算法模块之间,能够在保证高吞吐的情况下,又能够低延迟的实时响应上层任务,并保证整个系统确认性的运转
- CyberRT的核心理念是基于组件,组件有预先设定的输入输出。
- 实际上,每个组件都是一个专用的算法模块,框架可以根据所有预定义的组件生成有向无环图 (DAG)。
- 所有的这些组件都是基于cyberRT提供的调度程序mainboard加载运行。
- 在运行时,框架把融合好的传感器数据和预定义的组件打包在一起形成用户级轻量任务,之后,框架的调度器可以根据资源可用性和任务优先级来派发这些任务。
总结:
- CyberRT可以看做是操作系统之上的一层"RTOS"。
- CyberRT采用协程作为调度的基本单位,将原有的内核态调度,变为用户态调度。系统的线程,在CyberRT眼里,可以看做"CPU"。
apollo中很多功能都是基于Cyber RT的模块框架开发的,其声明周期由cyber RT管理。
特色
- 高性能:无锁对象,协程(coroutine),自适应通信机制;
- 确定性:可配置的任务以及任务调度,通过协程将调度从内核空间转移到用户空间;
- 模块化:在框架内实现组件以及节点,即可完成系统任务;
- 便利性:创建和使用任务
术语
Cyber |
ROS |
说明 |
Component |
无 |
- 在自动驾驶系统中,模块(如感知、定位、控制系统等)在 Cyber RT 下以 Component 的形式存在。
- 不同 组件 之间通过 Channel 进行通信。
- Component 概念不仅解耦了模块,还为将模块拆分为多个子模块提供了灵活性。
|
Channel |
Topic |
- Channel 用于管理 Cyber RT 中的数据通信。用户可以通过 publish/subscribe 相同的 channel 来通信。
|
Node |
Node |
- Node 是 Cyber RT 的基本组成部分。每个模块都包含一个 Node 并通过 Node 进行通信。
- 通过在节点中定义Reader/Writer 或 Service/Client,模块可以具有不同类型的通信形式
|
Reader/Writer |
Publish/Subscribe |
- 订阅者模式。往 channel 读写消息的类。 通常作为 Node 主要的消息传输接口。
|
Service/Client |
Service/Client |
- 除 Reader/Writer 外,Cyber RT 还提供了用于模块通信的 Service/Client 模式。
- 它支持节点间的双向通信。
|
Message |
Message |
- Cyber RT 中用于模块间通信的数据单元。其实现基于 protobuf
|
Parameter |
Parameter |
- 参数服务在 Cyber RT 中提供了全局参数访问接口。它是基于 Service/Client 模式构建的。
|
Record 文件 |
Bag 文件 |
- 用于记录从 channel 发送或接收的消息。 回放 record file 可以重现之前的操作行为。
|
Launch 文件 |
Launch 文件 |
- 提供一种启动模块的便利途径。通过在 launch file 中定义一个或多个 dag 文件,可以同时启动多个 modules。
|
Task |
无 |
|
CRoutine |
无 |
- 参考协程(Coroutine)的概念,Cyber RT 实现了 Coroutine 来优化线程使用和系统资源分配。
|
Scheduler |
无 |
- 为了更好地支持自动驾驶场景,Cyber RT 提供了多种资源调度算法供开发者选择。
|
Dag 文件 |
无 |
- Dag 文件是模块拓扑关系的配置文件。
- 可以在 dag 文件中定义使用的 Component 和上游/下游通道。
|
服务发现 |
|
- 作为一个去中心化的框架,Cyber RT 没有用于服务注册的主/中心节点。所有节点都被平等对待,可以通过“服务发现”找到其他服务节点。使用 UDP 用来服务发现。
|
易混淆的概念:module(模块)和component(组件),在Cyber RT中,一个module可以由多个component组成。
架构
可以粗略的分为上面几个大的模块
-
消息队列:主要作用是接收和发送各个节点的消息,涉及到消息的发布、订阅以及消息的缓存等
-
实时调度:主要是调度处理上述消息的算法模块,保证算法模块能够实时调度处理消息
-
用户接口:提供相关接口,将算法模块接入CyberRT的框架之内。
- Log+Tool:提供高效的日志打印,以及一系列的工具,比如bag包播放,点云可视化,消息监控灯
总结起来就是,cyber是一个分布式收发消息,和调度框架,同时对外提供一系列的工具和接口来辅助开发和定位问题。
详细一点如下图:
- 最下面一层是基础库,为了高效,Cyber RT 实现了自己的基础库。比如我们实现了 Lock-Free 的对象池,实现了 Lock-Free 的队列,随着成熟,会陆续开放更多。除了框架自身外,将来也会逐渐应用于算法模块。除了效率原因为,也希望 Cyber RT 减少依赖。
- 再往上是通信相关的,包括服务发现,还有 Publish-Subscribe 通信机制。 Cyber RT 也支持跨进程、跨机通信,上层业务逻辑无需关心,通信层会根据算法模块的部署,自动选择相应通信机制。
- 通信层之上是 数据缓存/融合层,多路传感器之间数据需要融合,而且算法可能需要缓存一定的数据。比如典型的仿真应用,不同算法模块之间需要有一个数据桥梁,数据层起到了这个模块间通信的桥梁的作用。
- 再往上是计算模型,包括调度层和任务。
- 最上面是提供给开发者的接口层。
- Cyber RT为开发者提供了Component 类,开发者的算法业务模块只需要继承该类,实现其中的 Proc 接口即可。该接口类似于 ROS 中的 Callback,消息通过参数的方式传递,用户只要在Proc中实现算法、消息处理相关的逻辑。Cyber RT 也基于协程,为开发者提供了并行计算相关的接口。
- Cyber RT 也为开发者提供了开发调试、录制回放等工具
概述
源码概览
Node是cyber通信系统中最顶层的类
- 每个Component都有一个Node
- 每个Node都会通过NodeChannelImpl和NodeServiceImpl位创建Reader, Writer, Service, Client来帮该组件获取信息或者传达信息。
NodeChannelImpl和NodeServiceImpl位于通信的第一层
- NodeChannelImpl是Node用来创建Channel相关的Reader和Writer的类。
- NodeServiceImpl和NodeChannelImpl类似,只不过它创建的是Service和Client,还会注册service
数据处理流程
我们先看下cyber中整个的数据处理流程,通过理解数据流程中各个模块如何工作,来搞清楚每个模块的作用
如上图所示,cyber的数据流程可以分为6个过程:
- Node节点中的Write往通道里面写数据
- 通道中的Transmitter发布消息,通道中的Receiver接收消息
- Receiver接收到消息之后,触发回调,触发DataDispather进行消息分发
- DataDispather接收到消息之后,把消息放入CacheBuffer中,并且触发Notifier,通知对应的DataVisitor处理消息
- DataVisitor把数据从CacheBuffer中读出,并且进行融合,然后通过notifier_唤醒对应的协程
- 协程执行对应的注册回调函数,进行数据处理,处理完成之后进入睡眠状态
整体介绍
Component和Node的关系
- Component是Cyber中封装好的数据处理流程,对用户来说,对应自动驾驶中的Planning Component,Perception Component等,目的是帮助我们更方便的订阅和处理消息。
- 实际上Component模块在加载之后会执行"Initialize()"函数,这是个隐藏的初始化过程,对用户不可见
- 在"Initialize()"中,Component会创建一个Node节点,每个Component模块只能有一个Node节点,在Node节点中进行消息订阅和发布。
Node和Reader\Writer的关系
在Node节点中可以创建Reader订阅消息,也可以创建Writer发布消息,每个Node节点中可以创建多个Reader和Writer。
Reader和Receiver,Writer和Transmitter,Channel的关系
- 一个Channel对应一个Topic,概念上对应ROS的消息通道,每个Topic都是唯一的。而Channel中包括一个发送器(Transmitter)和接收器(Receiver),通过Receiver接收消息,通过Transmitter发送消息。
- 一个Reader只能订阅一个通道的消息,如果一个Node需要订阅多个通道的消息,需要创建多个Reader。同理一个Writer也只能发布一个通道的消息,如果需要发布多个消息,需要创建多个Writer。Reader中调用Receiver订阅消息,而Writer通过Transmitter发布消息。
Receiver, DataDispatcher和DataVisitor的关系
- 每一个Receiver接收到消息之后,都会触发回调,回调中触发DataDispather(消息分发器)发布消息,DataDispather是一个单例,所有的数据分发都在数据分发器中进行,DataDispather会把数据放到对应的缓存中,然后Notify(通知)对应的协程(实际上这里调用的是DataVisitor中注册的Notify)去处理消息。
- DataVisitor(消息访问器)是一个辅助的类,一个数据处理过程对应一个DataVisitor,通过在DataVisitor中注册Notify(唤醒对应的协程,协程执行绑定的回调函数),并且注册对应的Buffer到DataDispather,这样在DataDispather的时候会通知对应的DataVisitor去唤醒对应的协程。
- 也就是说DataDispather(消息分发器)发布对应的消息到DataVisitor,DataVisitor(消息访问权)唤醒对应的协程,协程中执行绑定的数据处理
DataVisitor和Croutine的关系
- 实际上DataVisitor中的Notify是通过唤醒协程,每个协程绑定了一个数据处理函数和DataVisitor,数据到达之后,通过DataVisitor中的Notify唤醒对应的协程,执行数据处理回调,执行完成之后协程进入休眠状态
Scheduler, Task和Croutine
- 数据处理的过程实际上就是通过协程完成的,每一个协程被称为一个Task,所有的Task(任务)都由Scheduler进行调度。
运行流程
- 算法模块通过有向无环图(DAG),配置任务之间的逻辑关系。对于每个算法,也有其优先级,运行时间,使用资源等方面的配置。
- 系统启用时,结合而dag,调度配置等,创建相应的任务,从框架内部来讲,就是协程,调度器把任务放到各个Processor队列中
- 然后,由Sensor输入数据,驱动整个系统运转
从内核空间到用户空间
调度
- ROS 的主要挑战之一是没有调度,为了解决 ROS 遇到的问题,Cyber RT 的核心设计将调度、任务从内核空间搬到了用户空间,在原生的thread上加了一层协程(Coroutine),Cyber RT主要调度的就是协程。调度可以和算法业务逻辑紧密结合。
- 从 Cyber RT 角度,OS 的 Native thread 相当于物理 CPU。
- 在 OS 中,是内核中的调度器负责调度任务(进程、线程…)到物理 CPU 上运行。而在 Cyber RT 中,Cyber RT 中的调度器调度协程(Coroutine)在 Native Thread 上有序运行。
调度编排策略
程序员眼中的CyberRT,可以分为三个层级
服务及接口:Component,Service
数据融合,任务调度:
- crontine
- Scheduler
- Data
- Node
- Blocker
数据传输,服务发现:service_discovery,transport
使用示例
可以在这个目录/apollo/cyber/examples/找到很多例子
Note: 这些例子必须运行在 Apollo docker 环境, 且需要通过 Bazel 来编译。
talker.cc消息发布
cyber/examples/talker.cc内消息发布分为三步:
- talker_node创建
- talker创建
- 循环写入数据
//cyber/examples/talker.cc
// init cyber framework
apollo::cyber::Init(argv[0]);
// 1、talker_node创建
auto talker_node = apollo::cyber::CreateNode("talker");
// 2、talker创建
auto talker = talker_node->CreateWriter<Chatter>("channel/chatter");
Rate rate(1.0);
uint64_t seq = 0;
while (apollo::cyber::OK()) {
auto msg = std::make_shared<Chatter>();
msg->set_timestamp(Time::Now().ToNanosecond());
msg->set_lidar_timestamp(Time::Now().ToNanosecond());
msg->set_seq(seq);
msg->set_content("Hello, apollo!");
// 3、循环写入数据
talker->Write(msg);
AINFO << "talker sent a message! No. " << seq;
seq++;
rate.Sleep();
}
在cyber/node/node.h内 创建node_channel_impl_对象,通过node_channel_impl_的CreateWriter函数创建 writer_ptr对象,然后执行writer_ptr对象的Init()函数
//cyber/node/node_channel_impl.h
template <typename MessageT>
auto NodeChannelImpl::CreateWriter(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Writer<MessageT>> {
if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
AERROR << "Can't create a writer with empty channel name!";
return nullptr;
}
proto::RoleAttributes new_attr(role_attr);
FillInAttr<MessageT>(&new_attr);
//创建writer_ptr对象
std::shared_ptr<Writer<MessageT>> writer_ptr = nullptr;
if (!is_reality_mode_) {
writer_ptr = std::make_shared<blocker::IntraWriter<MessageT>>(new_attr);
} else {
writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
}
//执行Init()函数
RETURN_VAL_IF_NULL(writer_ptr, nullptr);
RETURN_VAL_IF(!writer_ptr->Init(), nullptr);
return writer_ptr;
}
Init函数创建了transmitter_对象,以及把当前节点信息加入到拓扑图中:
// cyber/node/writer.h
template <typename MessageT>
bool Writer<MessageT>::Init() {
{
std::lock_guard<std::mutex> g(lock_);
if (init_) {
return true;
}
//创建transmitter_对象用于消息发布
transmitter_ =
transport::Transport::Instance()->CreateTransmitter<MessageT>(
role_attr_);
if (transmitter_ == nullptr) {
return false;
}
init_ = true;
}
this->role_attr_.set_id(transmitter_->id().HashValue());
//创建channel_manager_对象用于消息管理
channel_manager_ =
service_discovery::TopologyManager::Instance()->channel_manager();
//把当前节点信息加入到拓扑图中
JoinTheTopology();
return true;
}
参考