ROS中使用A星算法进行路径规划

2023-05-16

ROS中使用A星算法进行路径规划

  • 前言
  • python编写A星算法
    • 描述
    • 具体实现
  • ROS结合A星算法
    • 描述
    • 具体实现
  • 实现结果

前言

由于本人这学期修了一门《智能工程》的课程,课程的大作业要求大家自主编写机器人导航程序,包括:路径规划、轨迹规划、控制器、机器人建模等相关程序,因此特在此写下一篇博客,记录开发过程。

这是第一篇博客,用于记录如何将自己编写的A星算法用于ROS中

python编写A星算法

描述

先声明:在使用Python编写A*算法的过程中,深深的体会到了C++的链表、指针的便利性,但奈何已经1年多没使用C++写过程序了,所以还是用了Python来完成的。我编写的A星算法的时间复杂度和空间复杂度肯定都很,希望大家看了轻喷。

由于要用在ROS的map中寻找路径,所以我把A星算法写成了一个类,只需要传进去三个参数,一个是二维的ListROS中规定:可走区域的数值为0,障碍物数值为100,未知领域数值为-1),一个是起始点的坐标,一个是终点的坐标

最后程序会返回一个List,里面的内容是从终点到起点的最短路径,用每一个点的坐标表示。

比如下面是一张地图:

01234567
000000000
10001000000
20001000000
30001000000
40001000000
500000000
600000000

规定起点为[2,2],终点为[2,4],则返回的内容为:[(2,4),(1,4),(0,3),(1,2),(2,2)]

具体实现

对于地图中每一个像素点,如果这个像素点可到达,那就计算当前的G代价(self.cost_g),H代价(self.cost_h),F代价(self.cost_f)

这个类,相当于C++中的struct

class map_node():
    def __init__(self):
    	# x、y代表该像素点在地图中的横纵坐标
        self.x = 0
        self.y = 0
        # F = G + H
        self.cost_f = 0
        self.cost_g = 0
        self.cost_h = 0
        # 父节点的横纵坐标
        self.parent = [0,0]

主要类:

class find_path():

类初始化函数:

  • extend_map函数是将地图扩展一圈,加一圈障碍物(像素值为1)

  • 声明一个state_map用于保存map中每一个像素点的状态:

    • 2代表已经在open表中
    • 3代表已经在close表中
    • 0代表还没有处理过
  • 起点和终点自然横纵坐标都+=1

  • 初始化一些变量 openlist、closelist

    def __init__(self, map, start, goal):
        # map是一个二维地图, start是起点坐标[],goal是终点坐标[]
        self.map = self.extend_map(map)
        # 2代表在open表中 3代表在close表中
        self.state_map = np.zeros([len(map) + 2, len(map[0]) + 2])
        # print self.map

        self.start = start
        self.start[0] += 1
        self.start[1] += 1
        self.goal = goal
        self.goal[0] += 1
        self.goal[1] += 1
        self.open_list = []
        self.cloase_list = []
        self.path = []
        self.if_reach = False

扩展地图边界的函数

      def extend_map(self, map):
        new_row = np.ones(len(map[0]))
        new_col = np.ones(len(map) + 2)
        x = np.insert(map, 0, new_row, axis=0)
        x = np.insert(x, len(map) + 1, new_row, axis=0)
        x = np.insert(x, 0 , new_col, axis=1)
        x = np.insert(x, len(map[0]) + 1 , new_col, axis=1)
        return x

主要的寻找路径的函数

  • 如果设置的起始点和终点不可达,print 提示,然后直接退出
  • append_around_open函数是把该点周围的8个点加到open表中
  • 把起始点加到close表中
  • 进入循环,利用find_min_cost_f每次寻找open表中最小的cost_f,把它周围的8个点加到open表中,然后把这个节点加到close表,直到找到了到终点的路径,然后利用append_path函数回溯父节点直到回溯到起点。
    def start_find(self):
        #第一次操作,把起点的周围的点指向起点,起点和周围的点加到open list,
        # print "-----start point-----",self.start
        if self.map[self.start[0]][self.start[1]] != 0:
            print "\033[0;31m[E] : Please set the valid start point\033[0m"
            print "value = ", self.map[self.start[0]][self.start[1]]
            return "None"
        if self.map[self.goal[0]][self.goal[1]] != 0:
            print "\033[0;31m[E] : Please set the valid goal point\033[0m"
            return "None"
        self.append_around_open(self.start, cost_g=0)

        # 把起始节点加到close_list
        temp = map_node()
        temp.x = self.start[0]
        temp.y = self.start[1]
        self.append_close(temp)
        while True:
            # print "-----"
            min_cost, index_min_cost = self.find_min_cost_f()
            current_node = self.open_list[index_min_cost]
            # print current_node.x
            # 如果最小的节点正好等于终点
            # print current_node.x, current_node.y
            # print self.state_map
            # time.sleep(1)
            if current_node.x == self.goal[0] and current_node.y == self.goal[1]:
                self.append_path(current_node)
                break
            self.append_around_open([current_node.x, current_node.y], cost_g=current_node.cost_g)
            # 加到close list
            self.append_close(current_node)
            self.open_list.remove(current_node)
        return self.path

利用循环遍历该点周围的8个点
注意:如果cost_f有更小的,要更新cost_f和父节点

        def append_around_open(self, coordinate, cost_g):
        for i in [-1, 0, 1]:
            for j in [-1, 0, 1]:
                if i == 0 and j == 0:
                    continue
                if self.map[coordinate[0] + i][coordinate[1] + j] == 0 \
                        and self.state_map[coordinate[0] + i][coordinate[1] + j] != 3:
                    temp = map_node()
                    # 计算G和H代价
                    temp.cost_g = 10 + cost_g
                    temp.cost_h = (abs(self.goal[0] - (coordinate[0] + i)) + abs(self.goal[1] - (coordinate[1] + j))) * 10
                    temp.cost_f = temp.cost_g + temp.cost_h
                    temp.x = coordinate[0] + i
                    temp.y = coordinate[1] + j
                    #链接父节点
                    temp.parent[0] = coordinate[0]
                    temp.parent[1] = coordinate[1]
                    # print "temp", temp
                    if self.state_map[coordinate[0] + i][coordinate[1] + j] == 2:
                        current_index = self.find_index(coordinate[0] + i, coordinate[1] + j)
                        # 如果之前的cost比现在的cost大,就替换父节点和cost
                        if self.open_list[current_index].cost_f > temp.cost_f:
                            self.open_list[current_index] = temp
                    else:
                        self.state_map[coordinate[0] + i][coordinate[1] + j] = 2
                        # 加入open list
                        self.open_list.append(temp)

回溯父节点,然后把每一个节点的坐标append到self.path这变量中

    # 最后找到终点,把最短路径append到path里
    def append_path(self, node):
        while True:
            self.path.append([node.x - 1, node.y - 1])
            if node.x == self.start[0] and node.y == self.start[1]:
                break
            current_index = self.find_close_index(node.parent[0], node.parent[1])
            # print "----------------", current_index
            # print self.cloase_list
            node = self.cloase_list[current_index]

寻找open表中的最小的cost_f

    # 寻找open表中的最小代价节点和index
    def find_min_cost_f(self):
        # 记录最小花费和其在openlist中的下标
        # print "--------------------------------one time----------------"
        min_cost = 100000
        index_min_cost = 0
        for i in range(len(self.open_list)):
            # print self.open_list[i].cost_f, min_cost
            if self.open_list[i].cost_f < min_cost:
                min_cost = self.open_list[i].cost_f
                index_min_cost = i
        return min_cost, index_min_cost

其它函数

    def find_close_index(self, x, y):
        for i in range(len(self.cloase_list)):
            if self.cloase_list[i].x == x and self.cloase_list[i].y == y:
                return i

    def find_index(self, x, y):
        for i in range(len(self.open_list)):
            if self.open_list[i].x == x and self.open_list[i].y == y:
                return i
  
    def append_close(self, node):
        # 更改节点状态
        self.state_map[node.x][node.y] = 3
        self.cloase_list.append(node)

ROS结合A星算法

描述

由于本人ROS学的并不是很深入,目前只掌握了topic这一种消息传输机制,因此,程序中只包括利用topic传输消息。

通过ROS中的Topic订阅"/map"、"/initialpose"、"/move_base_simple/goal"这三个话题,然后通过计算,将path发布出去("/path_my_A_star")

具体实现

类初始化函数

  • “/path_my_A_star"是未进行平滑处理的路径,有直角等,”/path_my_A_star_changed"是平滑处理的
    def __init__(self):
        rospy.init_node("path_pub")
        self.path_pub = rospy.Publisher("/path_my_A_star", Path, queue_size=15)
        self.path_pub_changed = rospy.Publisher("/path_my_A_star_changed", Path, queue_size=15)

        # 关于地图的一些变量
        self.origin_x = 0
        self.origin_y = 0
        self.resolution = 0
        self.width = 0
        self.height = 0
        # self.map_test_pub = rospy.Publisher("/map_test", OccupancyGrid, queue_size=15)
        self.map_sub = rospy.Subscriber("/map", OccupancyGrid, self.map_callback)
        self.current_path = Path()
        self.current_path_changed = Path()
        rospy.sleep(1)
        # 起始点和目标点
        self.start_map_point = []
        self.goal_map_point = []
        # 地图上的路径
        self.path_map = []
        self.path_world = []
        # 是否要寻找路径的开关
        self.if_start_find_path = False
        self.goal_pose = PoseStamped()
        self.init_pose = PoseWithCovarianceStamped()
        self.init_pose_sub = rospy.Subscriber("/initialpose", PoseWithCovarianceStamped, self.init_pose_callback)
        self.goal_pose_sub = rospy.Subscriber("/move_base_simple/goal", PoseStamped, self.goal_pose_callback)
        self.last_time = rospy.get_rostime()
        self.start_find_path()
        rospy.Rate(1)
        rospy.spin()

起始点回调函数

    def init_pose_callback(self, msg):
        # print "===========get initial pose================"
        self.init_pose = msg
        # print msg
        # print "----------------worldtomap------------------"
        self.start_map_point =  self.WorldTomap(msg.pose.pose.position.x, msg.pose.pose.position.y)
        print "----------------start point----------------",self.start_map_point
        print "value = ", self.map[self.start_map_point[0]][self.start_map_point[1]]
        if self.start_map_point == [-1, -1]:
            print "\033[0;31m[E] : Please set the valid goal point\033[0m"

终点回调函数

   def goal_pose_callback(self, msg):
        self.path_map = []
        self.goal_pose = msg
        self.if_start_find_path = True
        # print msg
        self.goal_map_point =  self.WorldTomap(msg.pose.position.x, msg.pose.position.y)
        print "-----------------goal point---------------",self.goal_map_point
        if self.goal_map_point == [-1, -1]:
            print "\033[0;30m[Kamerider E] : Please set the valid goal point\033[0m"
            return
        else:
            self.start_find_path()

地图回调函数

    def map_callback(self, msg):
        print msg.header
        print "------"
        print msg.info
        print "------"
        print len(msg.data)
        # 初始化map里的参数值
        self.origin_x = msg.info.origin.position.x
        self.origin_y = msg.info.origin.position.y
        self.resolution = msg.info.resolution
        self.width = msg.info.width
        self.height = msg.info.height
        print "-------",self.width
        raw = np.array(msg.data, dtype=np.int8)
        self.map = raw.reshape((self.height, self.width))
        self.map_sub.unregister()

将起始点和终点的topic里的World坐标系的数值转化为map上的像素坐标,用于world坐标系映射到map下的离散的像素点

    def WorldTomap(self, wx, wy):
        # 返回-1,-1就是有问题
        # print wx, wy
        # print self.origin_x, self.origin_y
        if wx < self.origin_x or wy < self.origin_y:
            # print "<<<<<<<"
            return [-1, -1]
        mx = (int)((wx - self.origin_x) / self.resolution)
        my = (int)((wy - self.origin_y) / self.resolution)
        if mx < self.width and my < self.height:
            # print ">>>>>>>>>>>"
            return [my, mx]
        return [-1, -1]

调用A星算法寻找路径

    def start_find_path(self):
        if self.if_start_find_path:
            print ('\033[0;32m[I] : Start find path with A* \033[0m')
            temp = A_star.find_path(self.map, self.start_map_point, self.goal_map_point)
            self.path_map = temp.start_find()
            print self.path_map
            self.publisher_path()
        else:
            rospy.sleep(1)
            print ('\033[0;33m[W] : Please set goal pose\033[0m')
            return

发布两种不同的路径,一种是原始路径,一种是利用插值平滑处理过的

    def publisher_path(self):
        time = 1
        y1 = []
        y2 = []
        for i in range(len(self.path_map)):
            current_time = rospy.get_rostime()
            current_pose = PoseStamped()
            current_pose.pose.position.x, current_pose.pose.position.y= self.mapToWorld(self.path_map[i][1], self.path_map[i][0])
            y1.append(self.mapToWorld(self.path_map[i][1], self.path_map[i][0])[0])
            y2.append(self.mapToWorld(self.path_map[i][1], self.path_map[i][0])[1])
            current_pose.pose.position.z = 0.0
            current_pose.pose.orientation.x = 0.0
            current_pose.pose.orientation.y = 0.0
            current_pose.pose.orientation.z = 0.0
            current_pose.pose.orientation.w = 1.0
            time += 1
            self.current_path.header.stamp = current_time
            self.current_path.header.frame_id = "odom"
            self.current_path.poses.append(current_pose)
            self.path_pub.publish(self.current_path)
            self.last_time = current_time


        # 通过差值做平滑处理
        length = len(self.path_map)
        x = np.array([num for num in range(length)])
        xnew = np.arange(0,length - 1, 0.1)
        func1 = interpolate.interp1d(x, y1, kind='cubic')
        func2 = interpolate.interp1d(x, y2, kind='cubic')
        ynew1 = func1(xnew)
        ynew2 = func2(xnew)
        for i in range(len(ynew1)):
            current_time = rospy.get_rostime()
            current_pose = PoseStamped()
            current_pose.pose.position.x = ynew1[i]
            current_pose.pose.position.y = ynew2[i]
            current_pose.pose.position.z = 0.0
            current_pose.pose.orientation.x = 0.0
            current_pose.pose.orientation.y = 0.0
            current_pose.pose.orientation.z = 0.0
            current_pose.pose.orientation.w = 1.0
            time += 1
            self.current_path_changed.header.stamp = current_time
            self.current_path_changed.header.frame_id = "odom"
            self.current_path_changed.poses.append(current_pose)
            self.path_pub_changed.publish(self.current_path_changed)
            self.last_time = current_time

实现结果

设置起点
在这里插入图片描述
设置终点
在这里插入图片描述
生成的路径
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ROS中使用A星算法进行路径规划 的相关文章

  • Darknet YoloV4编译+训练(避免踩坑)

    AlexAB darknet yolov4编译 43 训练 时间间隔好几天今天来更新一下yolov4的训练 训练篇 在训练之前需要对大佬的源码进行编译本本机编译 xff0c 编译过程可查看下述链接 xff1a https blog csdn
  • ubuntu下如何创建ros工作空间、创建ros功能包、创建ros节点

    1 打开终端进入存放ros工作空间的目录 xff08 比如我这里将它放在home目录下的test文件夹中 xff09 cd test 2 开始创建ros工作空间 mkdir p catkin ws src cd catkin ws src
  • STM32串口通信 (采用链表接收不定长数据帧)

    STM32串口通信 链表接收不定长数据帧 数据帧说明不太恰当的比方 数据缓冲链表结构效果展示工程文件 数据帧说明 STM32数据寄存器为USARTx gt DR寄存器 可以看到DR寄存器只有 8 0 位可以使用 xff0c 第8位用于奇偶校
  • A*寻路算法

    目录 1 动画演示2 游戏中的自动寻路A 算法3 A 寻路算法原理4 调试代码分析代码5 代码 1 动画演示 2 游戏中的自动寻路A 算法 随着3D游戏的日趋流行 在复杂的3D游戏环境中如何能使非玩家控制角色准确实现自动寻路功能成为了3D游
  • 2022数学建模国赛B题和C题高质量论文代码数据

    目录 B题论文 5 1 问题一的建模与求解 5 1 1 使用极坐标求解具体位置 C题论文 1 1 研究背景 1 2 问题的提 5 1 问题一的建模与求解 5 1 1 数据的预处理 B题论文 5 1 问题一的建模与求解 5 1 1 使用极坐标
  • stm32小白学习之寄存器名称

    IDR输入只读寄存器 xff0c ODR输出可读可写寄存器 BSRR xff08 置位寄存器 xff09 与BRR xff08 复位寄存器 xff09 CRL xff08 端口配置低位寄存器 xff09 与CRH xff08 端口配置高位寄
  • 使用Vite创建Vue3+TS项目并整合Element Plus框架等一条龙服务

    记录一下使用Vite创建Vue3 43 TS项目以及整合Element Plus框架 xff0c 还有Less Pinia Vue router monaco editor等插件或组件 一 使用Vite创建Vue3 43 TS项目 第一步
  • Qt学习 第37节:QString

    在阅读QString文档时 xff0c 出了一个词 implicit sharing copy on write xff0c 不是很懂 xff0c 下面链接解释的表明白 QT的隐式共享 Implicit Sharing 道路与梦想 CSDN
  • 下载Postman并且汉化使用

    下载Postman并且汉化使用 一 下载postman postman有不同的版本 xff0c 如果要汉化就要下载的版本与汉化包一致 下载地址 xff1a postman官网下载地址 xff1a https www postman com
  • 【Vue】postman汉化教程 保姆级教程 包教会

    下载链接 xff1a Win64 Win32 历史版本下载 请把下面链接的 34 版本号 34 替换为指定的版本号 xff0c 例如 xff1a 8 8 0 版本链接Windows32位https dl pstmn io download
  • 操作系统实验——进程与线程

    目录 1 使用GCC xff08 1 xff09 参数 xff08 2 xff09 自定义头文件 xff08 3 xff09 makefile脚本 xff08 4 xff09 gdb调试 2 进程 xff08 1 xff09 新建进程 xf
  • 串口应用(USART)

    串行口应用 1 USART介绍 通用同步异步收发器 USART 提供了一种灵活的方法与使用工业标准NRZ异步串行数据格式的 外部设备之间进行全双工数据交换 USART利用分数波特率发生器提供宽范围的波特率选择 它支持同步单向通信和半双工单线
  • cpp-httplib 避免阻塞主线程, c++封装httplib,httplib面向对象开发

    目录 说明 前言原生的httplib会阻塞你的主线程解决httplib阻塞主线程的问题BashController 面向对象风格使用httplib自定义controller MyController h文件自定义controller Tes
  • 数据结构——结构体的5种定义方式及对比

    以下仅为定义结构体的方式 xff0c 具体使用在后续的文章中介绍 span class token macro property span class token directive hash span span class token d
  • 【C++】STL应用(详解)

    一 泛型程序与STL1 泛型程序设计的基本概念2 STL简介 二 迭代器1 输入流迭代器2 输出流迭代器 三 STL应用1 撰写自己的算法和函数 xff0c 结合容器和迭代器解决序列变换 xff08 如取反 平方 立方 xff09 xff0
  • 模块学习(二)——MPU6050

    去年电赛备赛期间 xff0c 学的STM32标准库 xff0c 那一整个繁琐直接给我劝退了 xff0c 当时学习MPU6050时就非常痛苦 xff0c 代码也看不懂 xff0c 无非抄来抄去 xff0c 然后就是编译 xff0c 改错 xf
  • 【嵌入式面试题】常见的面试题梳理一

    注 xff1a 看面试题时 xff0c 主要应该以学习为主 xff0c 面试题有些基本上是我们编程时会遇上的问题 xff0c 通过学习面试题会提升我们的编程意识和解决一些日常我们编程所遇到的问题 xff0c 看完这篇面试题后 xff0c 希
  • 使用Python+openpyxl实现导出自定义样式的Excel文件

    之前项目中的导出Excel文件操作都是在前端完成的 xff0c 项目是由vue 43 vite构建的 xff0c 效果还不错的 xff0c 所需依赖包如下所示 npm i xlsx 64 0 18 5 npm i xlsx style vi
  • 【嵌入式面试题】常见面试题梳理二

    注 xff1a 看面试题时 xff0c 主要应该以学习为主 xff0c 面试题有些基本上是我们编程时会遇上的问题 xff0c 通过学习面试题会提升我们的编程意识和解决一些日常我们编程所遇到的问题 xff0c 看完这篇面试题后 xff0c 希
  • 【嵌入式面试题】常见面试题梳理三

    注 xff1a 看面试题时 xff0c 主要应该以学习为主 xff0c 面试题有些基本上是我们编程时会遇上的问题 xff0c 通过学习面试题会提升我们的编程意识和解决一些日常我们编程所遇到的问题 xff0c 看完这篇面试题后 xff0c 希

随机推荐