【ROS】源码分析-消息订阅与发布

2023-05-16

说明

本文通过NodeHandle::subscribePublication::publish()源码作为入口,来分析PubNode、SubNode之间是网络连接是如何建立的,消息是如何发布的,topic队列到底是如何实现的。

源码目录

ros_comm/clients/roscpp: ROS Node和底层的RPC通讯协议实现,都是c++代码。
ros_comm/tools/rosmaster: ROS Master的功能代码,都是python代码。
ros_tutorials/turtlsim:小海龟的PubNode和SubNode。

源码分析

各个组件 init & start

各个组件是如何启动的

int main(int argc, char** argv)
{
  ros::init(argc, argv, "teleop_turtle");
}

开始前先看下ros::init中的主要初始化代码:这里初始化了网络工具、master服务工具、本Node管理工具、Log工具、参数服务工具

(源码文件:ros_comm/clients/roscpp/src/libros/init.cpp)
void ros::init(const M_string& remappings, const std::string& name, uint32_t options) {
    network::init(remappings);
    master::init(remappings);
    // names:: namespace is initialized by this_node
    this_node::init(name, remappings, options);
    file_log::init(remappings);
    param::init(remappings);
}

然后是NodeHanle创建(构造函数),并调用ros::start(),从而启动各个组件

(源码文件:ros_comm/clients/roscpp/src/libros/node_handle.cpp)
NodeHandle::NodeHandle(const NodeHandle& parent, const std::string& ns, const M_string& remappings)
: collection_(0)
{
  namespace_ = parent.getNamespace();
  construct(ns, false);
}
void NodeHandle::construct(const std::string& ns, bool validate_name) {
   ros::start();
}

void start() {
  param::param("/tcp_keepalive", TransportTCP::s_use_keepalive_, TransportTCP::s_use_keepalive_);
  PollManager::instance()->addPollThreadListener(checkForShutdown);
  XMLRPCManager::instance()->bind("shutdown", shutdownCallback);
  TopicManager::instance()->start();
  ServiceManager::instance()->start();
  ConnectionManager::instance()->start();
  PollManager::instance()->start();
  XMLRPCManager::instance()->start();
}

XMLRPCManager 是ROS中RPC通讯的基础工具,下面看一下主要的功能:这里注册了几个主要的XMLHTTP方法(如publisherUpdate)的处理函数(如pubUpdateCallback), XMLHTTP请求本Node的request会先进入这些设定的callback方法,这些callback进过初步解析参数然后调用本Node具体的实现方法。

(源码文件:ros_comm/clients/roscpp/src/libros/topic_manager.cpp)
void TopicManager::start()
{
  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
  shutting_down_ = false;

  poll_manager_ = PollManager::instance();
  connection_manager_ = ConnectionManager::instance();
  xmlrpc_manager_ = XMLRPCManager::instance();

  xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, boost::placeholders::_1, boost::placeholders::_2));
  xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, boost::placeholders::_1, boost::placeholders::_2));
  xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
  xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, boost::placeholders::_1, boost::placeholders::_2));
  xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, boost::placeholders::_1, boost::placeholders::_2));
  xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, boost::placeholders::_1, boost::placeholders::_2));

  poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
}

Subscriber注册过程分析(消息订阅)

假设:publisher先启动,然后再启动 subscriber

subscriber启动后,向Master注册自己,并订阅topic

velocity_sub_ = nh_.subscribe("cmd_vel", 1, &Turtle::velocityCallback, this);

然后进入Master的registerSubscriber方法,注册完成后Master返回了Publisher的服务地址(pub_uris )

def registerSubscriber(self, caller_id, topic, topic_type, caller_api):

@apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
    def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
        try:
            self.ps_lock.acquire()
            # 1.记录subNode信息:id、topic、subServerURL
            self.reg_manager.register_subscriber(topic, caller_id, caller_api)
            if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE:
                self.topics_types[topic] = topic_type
            mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api)
            # 2.返回发布者(publisher)的信息给订阅者(subscriber)
            pub_uris = self.publishers.get_apis(topic)
        finally:
            self.ps_lock.release()
        return 1, "Subscribed to [%s]"%topic, pub_uris

拿到pub_urils后进入Subscription.pubUpdate开始更新publisher的服务地址(与下图中的Subscriber Node中的pubUpdate 完全一致)

(源码文件:ros_comm/clients/roscpp/src/libros/topic_manager.cpp)

bool TopicManager::subscribe(const SubscribeOptions& ops){
    SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
    s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);

    registerSubscriber(s, ops.datatype)
}

bool TopicManager::registerSubscriber(const SubscriptionPtr& s, const string &datatype) {

	XmlRpcValue args, result, payload;
	args[0] = this_node::getName();
	args[1] = s->getName(); //ops.topic
	args[2] = datatype;
	args[3] = xmlrpc_manager_->getServerURI();

	master::execute("registerSubscriber", args, result, payload, true);

	vector<string> pub_uris;
	for (int i = 0; i < payload.size(); i++)
	{
		if (payload[i] != xmlrpc_manager_->getServerURI())
		{
		  pub_uris.push_back(string(payload[i]));
		}
	}
	s->pubUpdate(pub_uris);//更新pubUrl: Subscription.pubUpdate
}

下面看一下SubNode如何更新pubUrls

(源码文件:ros_comm/clients/roscpp/src/libros/subscription.cpp)

bool Subscription::pubUpdate(const V_string& new_pubs) {

	//1. 打印 new_pubs (最新的pubUrls) 和 publisher_links_(本地存量记录的pubUrls)
	ROSCPP_LOG_DEBUG("Publisher update for [%s]: %s", name_.c_str(), ss.str().c_str());

	//2. 遍历 publisher_links_ 找出已经下线的 pubUrl (即:不在new_pubs中的url), 稍后进行释放 drop
	subtractions.push_back(*spc);

	//3. 对比本地存量记录的pubUrls找出本次新增的urls, 并尝试建立连接
	additions.push_back(*up_i);
	for (V_string::iterator i = additions.begin();
            i != additions.end(); ++i)
    {	
    	 retval &= negotiateConnection(*i);
    }
}

bool Subscription::negotiateConnection(const std::string& xmlrpc_uri){
	//与publisher通信,协商改topic的通信协议TCP/UDP
	c->executeNonBlock("requestTopic", params)
	
	//创建连接(实际是在 XMLRPCManager::serverThreadFunc 方法中创建连接,连接成功后通过回调pendingConnectionDone,最后publisher_links_.push_back(link))
	PendingConnectionPtr conn(boost::make_shared<PendingConnection>(c, udp_transport, shared_from_this(), xmlrpc_uri));
	XMLRPCManager::instance()->addASyncConnection(conn);
	// Put this connection on the list that we'll look at later.
	{
		boost::mutex::scoped_lock pending_connections_lock(pending_connections_mutex_);
		pending_connections_.insert(conn);
	}
}

最后Subscriber就主动建立了与Publisher的连接。

Publisher注册过程分析(Topic声明/发布)

假设:subscriber先启动,然后再启动 publisher

初始化完成后,然后Publisher向master发布topic信息(即注册自己的id&topic&serverUrl信息)

ros::NodeHandle nh_;
twist_pub_ = nh_.advertise<geometry_msgs::Twist>("turtle1/cmd_vel", 1);

然后进入Master的registerPublisher方法,在注册过程中Master会通知(_notify_topic_subscribers)已订阅该topic的Subscriber(这里假设Subscriber先启动)Publisher的服务地址(pub_uris )。

如何进入的Master方法在《【ROS】源码分析-服务发现原理》中有讲解:master::execute("registerPublisher"

(源码文件:ros_comm/tools/rosmaster/src/rosmaster/master_api.py)
    @apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
    def registerPublisher(self, caller_id, topic, topic_type, caller_api):
        try:
            self.ps_lock.acquire()
            # 1.记录pubNode信息:id、topic、pubServerURL
            self.reg_manager.register_publisher(topic, caller_id, caller_api)
            # don't let '*' type squash valid typing
            if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types:
                self.topics_types[topic] = topic_type
            pub_uris = self.publishers.get_apis(topic)
            sub_uris = self.subscribers.get_apis(topic)
            # 2.通知订阅者该topic的pubisherServer的信息
            self._notify_topic_subscribers(topic, pub_uris, sub_uris)
            mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api)
            # 3.返回订阅者(subscriber)的信息给发布者(publisher)
            sub_uris = self.subscribers.get_apis(topic)            
        finally:
            self.ps_lock.release()
        return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris

Subscriber得知pub_urls后会主动尝试与Publisher建立连接(与Subscriber注册过程中的pubUpdate完全一致),整个过程如下图所示:
Publisher注册和Topic声明过程

Publisher发布消息过程分析

Subscriber从pubUpdate中开始一步建立与Publisher的网络连接通道(conn),主要过程如下图:
在这里插入图片描述

网络连接建立玩后,就是开始发布消息了

geometry_msgs::Twist twist;
twist.angular.z = a_scale_*angular_;
twist.linear.x = l_scale_*linear_;
twist_pub_.publish(twist);    
  • Publisher先发布消息到本地队列,然后poll_manager会轮训经本地队列中的消息发给Subscriber。
  • Subscriber接收到消息后放到Callback本地队列,AsyncSpinner会轮训调度读取本地队列中的数据,并根据消息的type匹配出callback_func,然后进行回调。

在这里插入图片描述

Topic消息队列实现是怎样的

基于topic队列发送消息过程中的本地队列默认实现是

init.cpp

void ros::init(const M_string& remappings, const std::string& name, uint32_t options){
    g_global_queue.reset(new CallbackQueue);
}

CallbackQueue* getGlobalCallbackQueue()
{
  return g_global_queue.get();
}

callback_queue.h

class ROSCPP_DECL CallbackQueue : public CallbackQueueInterface 
{
    struct CallbackInfo
   {
    CallbackInfo()
	    : removal_id(0)
	    , marked_for_removal(false)
	    {}
	    CallbackInterfacePtr callback;
	    uint64_t removal_id;
	    bool marked_for_removal;
	};

    typedef std::deque<CallbackInfo> D_CallbackInfo;
    D_CallbackInfo callbacks_;
}

publication.h

class ROSCPP_DECL Publication
{
  typedef std::vector<SerializedMessage> V_SerializedMessage;
  V_SerializedMessage publish_queue_;
}

所以Publisher/Subscriber本地队列是内存队列/列表(std::dequestd::vector)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【ROS】源码分析-消息订阅与发布 的相关文章

随机推荐

  • Ubuntu20搭建pytorch深度学习框架——使用增强样本运行Dlinknet提取道路水体(五)——样本增强

    之前运行的样本是未进行过样本增强的 做出来多少样本就使用了多少样本来进行训练 这里复盘一下样本训练结果 首先是道路 使用512512分辨率的16461张标签和16461张原图进行训练 接下来是水体 使用512512分辨率的11503张标签和
  • Ubuntu20运行SegNeXt代码提取道路水体(二)——SegNeXt源代码安装到测试环境配置全过程摸索

    首先我们在第一篇里面已经下载了SegNeXt代码 打开源代码 查看readme文件 我们先安装一下里面提到的torchprofile 链接在这 其实只要这个语句就能安装 pip install torchprofile 这一步没什么问题 很
  • pyqt5页面美化全流程摸索(二)——为控件增加下拉选项及增加鼠标悬停后改变按钮颜色功能

    我想为我的一个控件增加菜单栏 搞了半天 查阅了大量文章 都失败了 没法给ToolButton增加menu啊 xff01 我是这么尝试的 首先查阅下列文章 pyqt5学习 菜单栏 xff08 QMenu xff09 工具栏QToolBar学习
  • 什么是NullReferenceException,如何解决?

    这篇文章是 社区维基 编辑现有答案以改善此职位 它当前不接受新的答案 我有一些代码 xff0c 执行时会抛出NullReferenceException xff0c 说 xff1a 你调用的对象是空的 这是什么意思 xff0c 我该怎么做才
  • 七、使用arcgis对道路结果进行后处理及iou优化步骤详解

    最近在研究对道路的后处理 废话不多说 直接放我的教程了 分别对real真实和predict预测的图片进行镶嵌操作 教程在这里 工具在这里 结果如下 矢量化提取道路中心线 经过很多尝试 arcscan是提取效果最好的一个方法 xff0c 操作
  • 八、使用代码对道路结果进行后处理及iou优化步骤详解

    老师又给我画了大饼 没办法 只能按照他们的想法做个尝试 上一篇的方法还没进行下去 就被叫停 又更新了一个新的想法 这里记录一下 我的尝试过程 一 图片膨胀 首先使用代码对道路进行膨胀 这里的代码 import cv2 import nump
  • add_library cannot create target “XXXX“ because another target with the same name already exists.

    CMake Error at CMakeLists txt add library cannot create target xxx because another target with the same name already exi
  • FreeRTOS的同步机制---事件event

    目录 1 event的基本概念 2 event的特点 3 event的运行机制 4 event与semphore的区别 5 event的应用场景 6 demo分析 1 event的基本概念 事件event是用于任务间通信的机制 xff0c
  • FreeRTOS同步机制--互斥信号量

    1 为什么引入互斥量 普通的信号量在任务之间同步时 xff0c 会出现优先级翻转问题 xff0c 如下 xff1a 执行流程如下 xff1a 1 xff09 此时 xff0c 任务就绪列表中 xff0c 低优先级任务L的优先级最高 xff0
  • 为何某些公司不允许使用 C++ STL

    作者 xff1a 陈甫鸼 链接 xff1a https www zhihu com question 20201972 answer 23454845 来源 xff1a 知乎 著作权归作者所有 xff0c 转载请联系作者获得授权 最初开始禁
  • 如何发明新算法(一)

    如何发明新算法 xff08 一 xff09 算法一直是计算机科学的核心 xff0c 算法改变世界 xff0c 算法创造未来 xff01 这篇文章我主要从复杂化 简单化两个方面谈谈怎么样发明一个新的算法 新算法在时间复杂度 xff0c 空间复
  • python进行http登录

    摘要 xff1a 有时需要用python做一些自动化页面请求 xff0c 但请求又需要登录权限 xff0c 好比如抢票 在有账号密码的情况下 xff0c 可以用request Session进行带session的http请求 xff0c 这
  • Ubuntu14.04 for ROS indigo的安装(电脑配置)

    前言 由于个人需要 xff0c 将笔记本电脑重新装了系统 首先用空白U盘进行系统刻盘 xff0c 然后电脑所有数据备份 xff0c 最后重新安装 装入的系统是exbot 机器人提供的Ubuntu14 04 for ros indigo xf
  • Django自带的加密算法及加密模块

    Django 内置的User类提供了用户密码的存储 验证 修改等功能 xff0c 可以很方便你的给用户提供密码服务 默认的Ddjango使用pbkdf2 sha256方式来存储和管理用的密码 xff0c 当然是可以自定义的 Django 通
  • 如何在Python中使用“ with open”打开多个文件?

    我想一次更改几个文件 xff0c 前提是我可以写入所有文件 我想知道我是否可以将多个打开调用与with语句结合with xff1a try with open 39 a 39 39 w 39 as a and open 39 b 39 39
  • 工业控制领域的期刊

    我们都知道目前做控制的大体分两大类人 xff0c 一类是做纯控制理论的 xff0c 主要是跟数学打交道 xff1b 另一类是做控制理论在各个行业的应用的 xff0c 其中包括电力系统 xff0c 机器人 xff0c 智能交通 xff0c 航
  • VNC 灰屏

    用vnc连接服务器的时候 xff0c 出现了灰屏 xff0c xff08 在xshell可以正常运行 xff09 上面会显示三个checkbox xff1a Accept clipboard from viewers Send clipbo
  • Ubuntu卸载python3.6

    注意 xff1a 这里说一下 xff0c 系统自带的python3 6可别乱删 xff0c 这个是我自己下载的python3 6 若你们有想卸载系统自带的python3 6 xff0c 可千万别去卸载 xff01 一般会开机都开不起 xff
  • 深度学习之BP神经网络

    深度学习之BP神经网络 BP xff08 Back Propagation xff09 网络是1986年由Rumelhart和McCelland为首的科学家小组提出 xff0c 是一种按误差逆传播算法训练的多层前馈网络 它的学习规则是使用最
  • 【ROS】源码分析-消息订阅与发布

    说明 本文通过NodeHandle subscribe和Publication publish 源码作为入口 xff0c 来分析PubNode SubNode之间是网络连接是如何建立的 xff0c 消息是如何发布的 xff0c topic队