ROS中的多线程
- ROS多线程
- 消息回调处理函数
- 多线程
- MultiThreadedSpinner
- AsyncSpinner
- callback传参
- timer
- timer加参数
- C++中的多线程
- Python中的多线程
ROS使用master管理节点,各节点之间可以使用同步异步多种通信方式,这种设计的思想本身其实鼓励把功能分别在多个节点实现,但是真实情况中处理复杂任务,多线程是必不可少的。
在ROS中可能使用到的线程可整体分为两种,ROS自旋处理线程,与语言库本身的线程,前者主要服务于callback数据的处理,后者处理同步或者异步的主动任务。
ros的C++接口:提供了丰富的接口。
ROS多线程
ROS提供的用于处理callback的线程机制。接口包括自旋,CallbackQueue队列处理,time callback等。
消息回调处理函数
ros::spin()和ros::spinOnce()。
两者区别在于,前者会循环,主程序运行到这里就不往下执行了,而后者只运行一次,在调用后还可以继续执行之后的程序。
如果程序含有相关的消息订阅函数,那么程序在执行过程中,除了主程序以外,ROS还会自动在后台按照规定的格式,接受订阅的消息,但是所接收到的消息并不是立刻就被处理,而是必须要等到消息回调处理函数执行的时候才被调用,这就是消息回到函数的原理。
ros::spinOnce()的用法相对来说很灵活,但往往需要考虑调用消息的时机,调用频率,以及消息池的大小,这些都要根据现实情况协调好,不然会造成数据丢包或者延迟的错误。
while(ros::ok()) {
print();
ros::spinOnce();
loop_rate.slepp();
}
多线程
MultiThreadedSpinner
在构造过程中可以指定它所用线程数,但如果不指定线程数或者线程数设置为0,它将在每个cpu内核开辟一个线程。
#include "ros/ros.h"
#include "std_msgs/String.h"
#include <boost/thread.hpp>
ros::Duration d(0.01);
class Listener
{
public:
void chatter1(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter1: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
void chatter2(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter2: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
void chatter3(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter3: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
};
void chatter4(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter4: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener");
ros::NodeHandle n;
Listener l;
ros::Subscriber sub1 = n.subscribe("chatter", 10, &Listener::chatter1, &l);
ros::Subscriber sub2 = n.subscribe("chatter", 10, &Listener::chatter2, &l);
ros::Subscriber sub3 = n.subscribe("chatter", 10, &Listener::chatter3, &l);
ros::Subscriber sub4 = n.subscribe("chatter", 10, chatter4);
ros::MultiThreadedSpinner s(4);
ros::spin(s);
return 0;
}
AsyncSpinner
异步线程处理,同时拥有start() 和stop() 函数,并且在销毁的时候会自动停止。
#include "ros/ros.h"
#include "std_msgs/String.h"
#include <boost/thread.hpp>
ros::Duration d(0.01);
class Listener
{
public:
void chatter1(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter1: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
void chatter2(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter2: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
void chatter3(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter3: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
};
void chatter4(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("chatter4: [" << msg->data << "] [thread=" << boost::this_thread::get_id() << "]");
d.sleep();
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener");
ros::NodeHandle n;
Listener l;
ros::Subscriber sub1 = n.subscribe("chatter", 10, &Listener::chatter1, &l);
ros::Subscriber sub2 = n.subscribe("chatter", 10, &Listener::chatter2, &l);
ros::Subscriber sub3 = n.subscribe("chatter", 10, &Listener::chatter3, &l);
ros::Subscriber sub4 = n.subscribe("chatter", 10, chatter4);
ros::AsyncSpinner s(4);
s.start();
ros::Rate r(5);
while (ros::ok())
{
ROS_INFO_STREAM("Main thread [" << boost::this_thread::get_id() << "].");
r.sleep();
}
return 0;
}
## 自定义队列处理线程
#include "ros/ros.h"
#include "ros/callback_queue.h"
#include "std_msgs/String.h"
#include <boost/thread.hpp>
void chatterCallbackMainQueue(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("I heard: [ " << msg->data << "] in thread [" << boost::this_thread::get_id() << "] (Main thread)");
}
void chatterCallbackCustomQueue(const std_msgs::String::ConstPtr& msg)
{
ROS_INFO_STREAM("I heard: [ " << msg->data << "] in thread [" << boost::this_thread::get_id() << "]");
}
ros::CallbackQueue g_queue;
void callbackThread()
{
ROS_INFO_STREAM("Callback thread id=" << boost::this_thread::get_id());
ros::NodeHandle n;
while (n.ok())
{
g_queue.callAvailable(ros::WallDuration(0.01));
}
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener_with_custom_callback_processing");
ros::NodeHandle n;
ros::SubscribeOptions ops = ros::SubscribeOptions::create<std_msgs::String>("chatter", 1000,
chatterCallbackCustomQueue,
ros::VoidPtr(), &g_queue);
ros::Subscriber sub = n.subscribe(ops);
ros::Subscriber sub2 = n.subscribe("chatter", 1000, chatterCallbackMainQueue);
boost::thread chatter_thread(callbackThread);
ROS_INFO_STREAM("Main thread id=" << boost::this_thread::get_id());
ros::Rate r(1);
while (n.ok())
{
ros::spinOnce();
r.sleep();
}
chatter_thread.join();
return 0;
}
callback传参
#include "ros/ros.h"
#include "std_msgs/String.h"
class Listener
{
public:
ros::NodeHandle node_handle_;
ros::V_Subscriber subs_;
Listener(const ros::NodeHandle& node_handle): node_handle_(node_handle)
{
}
void init()
{
subs_.push_back(node_handle_.subscribe<std_msgs::String>("chatter", 1000, boost::bind(&Listener::chatterCallback, this, _1, "User 1")));
subs_.push_back(node_handle_.subscribe<std_msgs::String>("chatter", 1000, boost::bind(&Listener::chatterCallback, this, _1, "User 2")));
subs_.push_back(node_handle_.subscribe<std_msgs::String>("chatter", 1000, boost::bind(&Listener::chatterCallback, this, _1, "User 3")));
}
void chatterCallback(const std_msgs::String::ConstPtr& msg, std::string user_string)
{
ROS_INFO("I heard: [%s] with user string [%s]", msg->data.c_str(), user_string.c_str());
}
};
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener_with_userdata");
ros::NodeHandle n;
Listener l(n);
l.init();
ros::spin();
return 0;
}
timer
#include "ros/ros.h"
void callback1(const ros::TimerEvent&)
{
ROS_INFO("Callback 1 triggered");
}
void callback2(const ros::TimerEvent&)
{
ROS_INFO("Callback 2 triggered");
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "talker");
ros::NodeHandle n;
ros::Timer timer1 = n.createTimer(ros::Duration(0.1), callback1);
ros::Timer timer2 = n.createTimer(ros::Duration(1.0), callback2);
ros::spin();
return 0;
}
timer加参数
#include "ros/ros.h"
#include "cstring"
#include "iostream"
#include "han_agv/SocketClient.h"
#include "han_agv/VelEncoder.h"
using namespace std;
const float_t PI = 3.14159;
const int ENCODER_PER_LOOP = 900;
const float_t WHEEL_RADIUS = 0.075;
ClientSocket soc;
void timerCallback(const ros::TimerEvent&, ros::Publisher vel_pub)
{
ROS_INFO("Timer callback triggered");
AGVDATA buffer;
AGVSENSORS buffer_left_encoder_start;
AGVSENSORS buffer_right_encoder_start;
double time_delay = 0.1;
ros::Duration(time_delay).sleep ();
buffer.SensorsData.WheelLeft_Encoder[0] = buffer.SensorsData.WheelLeft_Encoder[3];
buffer.SensorsData.WheelLeft_Encoder[1] = buffer.SensorsData.WheelLeft_Encoder[2];
buffer.SensorsData.WheelRight_Encoder[0] = buffer.SensorsData.WheelRight_Encoder[3];
buffer.SensorsData.WheelRight_Encoder[1] = buffer.SensorsData.WheelRight_Encoder[2];
han_agv::VelEncoder vel;
vel.left_velocity = 1.0;
vel.right_velocity = 2.0;
vel_pub.publish(vel);
}
int main(int argc, char** argv)
{
ros::init(argc, argv, "TCPDecode_node");
ros::NodeHandle nh;
ROS_INFO("create node successfully!");
ros::Publisher vel_pub = nh.advertise<han_agv::VelEncoder>("HansAGV/encoder_vel", 1);
ros::Timer decoder_timer = nh.createTimer(ros::Duration(0.1), boost::bind(timerCallback, _1, vel_pub));
ros::spin();
}
C++中的多线程
简单使用
#include <thread>
#include <iostream>
void foo() {
std::cout << "Hello C++11" << std::endl;
}
int main() {
std::thread thread(foo);
thread.join();
return 0;
}
传入参数
#include <thread>
#include <iostream>
void hello(const char *name) {
std::cout << "Hello " << name << std::endl;
}
int main() {
std::thread thread(hello, "C++11");
thread.join();
return 0;
}
类成员函数作为线程入口
#include <thread>
#include <iostream>
class Greet
{
const char *owner = "Greet";
public:
void SayHello(const char *name) {
std::cout << "Hello " << name << " from " << this->owner << std::endl;
}
};
int main() {
Greet greet;
std::thread thread(&Greet::SayHello, &greet, "C++11");
thread.join();
return 0;
}
- join:当前线程释放资源,优先执行join线程,等待join线程执行完毕,释放资源后前线程继续执行。
- yield: 当前线程放弃执行,操作系统调度另一线程继续执行。
- get_id: 获取线程 ID。
- sleep_until: 线程休眠至某个指定的时刻(time point),该线程才被重新唤醒。
void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );
- sleep_for: 线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长。
Python中的多线程
python中的多线程是特殊的,经常提到伪线程的概念,主要因为没有真正提供并行处理,没有发挥出多核的运算优势。
为什么没有提供真正并行的线程运算?实则是为了解决资源抢占问题,为了解决问题,使用了GIL,GIL 的全名是 the Global Interpreter Lock (全局解释锁),是常规 python 解释器(当然,有些解释器没有)的核心部件。线程锁的概念应该都不陌生,GIL同样是用于保护 Python 内部对象的全局锁(在进程空间中唯一),保障了解释器的线程安全。
但很显然也带来了一个问题:每个时刻只有一条线程在执行,即使在多核架构中也是如此——毕竟,解释器只有一个。这就是伪线程的由来。
如何实现真正的并行,多进程,但是同时多进程又带来另一些问题:
- 进程之间不能共享内存,但线程之间共享内存非常容易。
- 操作系统在创建进程时,需要为该进程重新分配系统资源,相比创建线程的代价则小得多。
所以,还是老实先用多线程吧。
import threading
import time
from datetime import datetime
class MyThread(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
def run(self):
time.sleep(5)
print "子线程动作",threading.current_thread().name, datetime.now()
if __name__ == "__main__":
t1 = MyThread(999)
t1.setDaemon(True) # 添加守护线程!
t1.start()
t1.join() # 添加join函数!
for i in range(5):
print "主线程动作",threading.current_thread().name, datetime.now()
部分转载出处:
原文链接:https://blog.csdn.net/yaked/article/details/50776224
菜鸟教程:https://www.runoob.com/w3cnote/cpp-std-thread.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)