A*算法-Python实现

2023-05-16

好久没有在CSDN上发文章了,快一年了吧。这两天重新登录了一下,不看不知道,一看吓一跳,没想到访问量快13万了。

之前写博客的时候,想着把一些有用的东西写下来,一方面是当做笔记了,免得以后忘记了;另外一方面是分享用。可是后来没想到,CSDN首页上的越来越多的标题党文章,没有一点营养,所以就愈发觉得厌恶。慢慢的就没有继续发文章了。虽然CSDN上好久没发文章了,但其实我还是坚持在写技术博客的,不过主要都是发到了自己的博客网站上去,这一年多写了40万字了。

好久没有登录CSDN了,今天登录突然看到访问量一下从几千变成13万了,还是颇感欣慰的。相比于没意思的标题党文章,我觉得还是更加有知识的文章才更加有用。脑子里装着有用的知识总比装着垃圾要好。

所以想想未来还是同步一下两边的文章吧。首先我自己的博客特意没有做Google和百度SEO,免得流量被打爆了。所以以后CSDN上只发文章,不准备引流到我的博客网站。其次,未来准备发的东西的话,先把这一年写的上百篇文章慢慢同步,然后看情况再看看要写些什么技术博客。

目前就说这么多吧,下面就是这篇新的文章。我在后台看我的A算法实现的那个文章阅读量很高,但其实那个文章实现的不是很好,开销比较大。所以这次就把后来我重复实现的A算法的文章上传吧。




本文主要提供了Python版本的A*路径规划算法实现,并在此基础上提供了命令行和基于Matplotlib的GUI用户界面(User Interface)

完整效果

A*算法-Python实现

路径规划算法是计算机届中非常重要的算法,在不少领域都能够看到路径规划算法的应用,最直接的应用例如打车软件为你规划出路径、迷宫问题的求解等等;而复杂一些的、不直接的应用如六自由度机械臂在空间中运动轨迹的规划可以看做是在六维空间的路径规划、火星车在火星表面的探索等等都可以视为路径规划问题,从而使用路径规划算法进行求解。

路径规划算法大体上可以分为两类:静态路径规划以及动态路径规划。所谓静态静态路径规划指的是提前已知全局地图的前提下进行路径规划;而动态路径规划则指的是预先并不知道全局地图或者仅知道全局地图的一部分,在此基础上对全局地图进行探索的同时动态的利用获取到的信息指导路径规划,最终达到终点。在我们的认知上,一般当然是动态路径规划算法会好很多,因为我们往往并不知道全局地图,而且对于我们人类来说,在一个陌生环境下进行路径规划、到达终点更是我们所希望的。然而在现实中在一些场景中确实全局地图是可以提前获得的,因此针对这些场景,开发出来资源占用更少、速度更快、路径更优的算法就还是有必要的。

其实动态路径规划更进一步,在探索的构建出来全局地图,那就成了SLAM问题了,即同时定位与建图(Simultaneous Localization And Mapping,SLAM)。关于SLAM本文就不再深入了。

本文将要介绍的A*算法就是属于静态路径规划算法的其中一种。在诸多的静态路径规划算法里,A*算法可以和暴力搜索(广度优先、深度优先)一样,获得最优的路径,并且资源消耗远远小于广度优先和深度优先。因此有广泛的应用。

本文将在介绍A*算法的基础上提供一种A*算法的Python实现,并在此基础上讲解实现的代码

0. 前言

其实我现在是一个大二的计算机系学生,最近数据结构课程的配套实验要求学生从十四个题中选择四个来做。本篇博客就是对应的迷宫问题的求解代码。求解问题比较简单,但是这样就太没有意思了,所以在实现文本字符的迷宫求解基础上,我想为自己带来一些挑战,增加一些趣味性与难度。

因此在基础的求解迷宫问题的要求上,我利用Matplotlib中的Interactive Plotting Method将路径规划的过程以GUI的形式表达出来,并在此基础上提供地图绘制、地图保存等一系列功能,最后完成一个不错的程序。

数据结构算法实验

1. A*算法

其实关于A*算法并没有什么好说的,网上不少博客对A*算法都有了非常深入的介绍,因此我这里就不在赘述了,避免班门弄斧。

而对于A*算法,需要理解的关键之处就是在启发值的运用。启发值描述了当前点到终点的估计距离,因此相比于深度有限和广度优先,A*算法每次都会去寻找离终点距离比较近的点作为下一个点,从而避免了漫无目的、低效率的遍历。

此外,算法中的OpenList和CloseList其实指的就是已探索区域的边缘和已经探索区域的内部,因此每次进行下一次探索的时候从OpenList里面找出来点作为下一个探索的点即可。而一旦这个点被探索完毕,将其加入到CloseList中即可,即表示当前点已经被考虑过。

最后算法的结束条件就是当终点已经在CloseList中了,表示已经找到了一条路径,此时对CloseList中的点进行回溯即可找到路径,因为CloseList中的探索区域已经蕴含了路径的信息;此外还要注意找不到路径时候的结束条件,避免找不到路径而陷入死循环。

上面是我对于A*算法的理解,如果需要学习A*算法的话,其实B站上、知乎上有很多不错的讲解。这里就贴出来百度百科和维基百科的介绍

From Wikipedia

A* (pronounced “A-star”) is a graph traversal and path search algorithm, which is often used in many fields of computer science due to its completeness, optimality, and optimal efficiency. One major practical drawback is its O ( b d O(b^{d} O(bd) space complexity, as it stores all generated nodes in memory. Thus, in practical travel-routing systems, it is generally outperformed by algorithms which can pre-process the graph to attain better performance, as well as memory-bounded approaches; however, A* is still the best solution in many cases.

维基百科上对A*算法的介绍

From BaiduBaike

A*算法,A*(A-Star)算法是一种静态路网中求解最短路径最有效的直接搜索方法,也是解决许多搜索问题的有效算法。算法中的距离估算值与实际值越接近,最终搜索速度越快。

百度百科上对A*算法的介绍

2. 实现思路

针对不同的功能,整个程序的实现,主要分为三个层次(软件分层的思想):

  1. 核心层:负责GUI的显示(地图的绘制与更新)、地图的加载与保存、键盘鼠标等事件对应的回调函数(信号处理函数)的绑定
  2. 算法实现层:负责A*算法的实现
  3. 用户界面层:负责提供命令行用户界面,从而实现高级感(逼格)

1. 核心层

核心层的具体任务为维护全局地图以及进行地图的保存,因此代码也是围绕这两大内容进行编写的。

1. 地图的维护

对于路径规划问题,其核心维护对象就是全局地图地图,因为地图的保存、与加载;GUI的显示以及路径规划的实现都需要地图上区域的状态,因此在核心层首先需要实现的的就是地图。

  • 首先是关于地图的表示。对于地图本身而言,由于目前进行的是二维的路径规划,因此可以利用一个二维数组来表示。
  • 其次是地图在磁盘上的表示。为了方便保存之后方便读取、加载以及人类的理解,可以将二维数组保存为csv文件从而序列化到磁盘上,而在加载地图的时候再从磁盘上反序列化csv文件到内存,从而读取出来地图。
  • 最后是地图中障碍物的表示。由于地图中的区域会有多种状态,例如是否为障碍物(是否可行),其次在进行A*路径规划的时候也会为地图中的区域标记其状态(是否已经考虑过)。因此针对地图中不同的状态的区域,使用不同的数值表示当前区域的状态,例如inf表示障碍物,3表示最终规划得到的路径。

2. 地图的显示

由于地图的显示涉及到图形的显示,因此需要了解一下图形在计算机中的显示。

图形的显示有两种方式,分别是矢量图以及光栅图。

  • 对于矢量图,其内部的图形是由基本的几何图形构成的,因此在矢量图文件(例如.dot)文件中,只需要按照指定的格式描述图形的属性即可。例如Windows的.dot文件,我们按照其要求的语法对图形进行描述,然后对应的解析该语法,利用图形学的算法即可渲染出来图形,我们常见的PDF、SVG等等文件都是矢量图。例如下面的例子

矢量图的一种:dot文件

  • 而对于光栅图,其基本构成单位是像素,每个像素大小只有零点几个毫米,而每平方厘米都会有几百几千个像素。而每个像素都会有自己的数值,表现出来自己的颜色。因此我们通过几万个极小的、在人眼看来是点的像素组合起来就成了我们所看到的图像。因此这种图像又称为位图、点阵图、像素图。我们常见的JPG、PNG、BMP等等都是光栅图,例如下面的例子

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xSv4Dmap-1669722993662)(https://bkimg.cdn.bcebos.com/pic/54fbb2fb43166d22356091ba4b2309f79052d28d?x-bce-process=image/watermark,image_d2F0ZXIvYmFpa2U4MA==,g_7,xp_5,yp_5/format,f_auto)]

矢量图最大的好处就在于由于其图形是完全利用数学公式描述的(例如圆相对于屏幕左上角的相对距离、圆相对于屏幕宽度的半径等等),因此针对不同的屏幕、不同的放大背书,矢量图片完全可以通过运算进行放大、适配。因此在我们用户看来就是矢量图是无论怎样放大边缘都不会模糊。例如下面的pdf

放大2.5倍的PDF文件

aas

而光栅图的基本单位是像素,因此再怎么放大,像素都是不会变的,因此在放大一定的倍数就会模糊,这个时候即便有一些算法会做自适应,例如双线性插值、三线性插值等等增加像素的数量,但是他们会造成边缘的模糊

光栅图放大后可见的像素

经过插值处理之后的图片会变的模糊

因此在不同场景下有不同的取舍。一般而言,当图形比较好描述(都是规则的几何图形)的时候,就会选择用矢量图,而当图形不好描述的时候(例如自然景观),这个时候就会选择光栅图

尽管地图都是规整的矩形和色块,使用矢量图非常容易描述,但是考虑到如果需要使用矢量图像的话还需要不少的学习成本来学习矢量图的描述语言,花费的时间比较多。而有考虑到Python中其实已经想Matplotlib、Seaborn、Plotly这类非常好的光栅图绘图库(当然他们可以绘制矢量图),因此最终决定使用光栅图来显示、绘制地图。

当然,其实Matplotlib、Plotly这些库最终调用的都是类似于cairo、gtk这些由C/C++开发出来的图形库

3. 地图动态显示

地图的绘制与显示实际上是一个非常消耗资源的操作。上面的GIF和后面放出来的B站的演示视频里A*算法花了不少的时间最后才找到最终的路径,但是真正的纯A*算法的规划路径的速度才只有0.2秒左右,这还是我在100*100的地图上测试的。然而为了显示出A*的动态效果图,因此算法的每次试探都会进行地图的绘制。而每次绘图就需要占去0.2秒左右的时间(我读取系统时间做了benchmark)。因此只敢显示30*30的地图。

因此为了尽可能的的加快绘图的速度,在图形中对所有的Patch(即一个小方格)进行记录,每次只会重新绘制发生了变化的Patch而不会重新绘制整张地图,因此极大地加快了显示的速度。至少在我测试的50*50的场景下效果还是非常不错的。

2. 算法实现层

算法实现层负责实现A*算法。A*算法中使用使用OpenList以及CloseList维护已探索区域和边缘区域。此外为实现算法实现层与核心层解耦合,而二维地图的状 态的维护由核心层提供,因此在算法实现层只需要调用核心层提供的接口进行地图状态修改即可

3. 用户界面层

用户界面层负责GUI的显示以及命令行用户界面。GUI部分通过Matplotlib内嵌的信号与槽机制实现用户鼠标、键盘事件的监听以及回调函数的绑定,进而响应用户事件,完成功能,例如:保存地图 。命令行用户界面负责根绝接收到不同的用户参数,运行不同的功能,例如绘制地图、开始路径规划……

3. 具体代码

1. 导入库

导入Python中队列维护的库queue、路径处理库pathlib、二维数组维护库Numpy、csv序列化与反序列化Pandas库、绘图库Matplotlib以及命令行彩色字体库colorama

import time
import queue
import pprint
import argparse
from typing import *
from pathlib import Path
from collections import namedtuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.backend_bases as bbase

from colorama import Fore, Style

2. 核心对象

_MapBase中维护的核心对象为map,程序所处的状态由一个有限状态机维护,不同的事件由不同的回调函数负责处理。

首先,__del__、new、update、add_patch实现了GUI图形的绘制以及更新

pandasfy与depandasfy负责将二维地图以csv文件格式保存至磁盘上

_btn_press_cb、_btn_release_cb、_move_cb、_keyboard_cb分别响应鼠标按下、松开、移动以及键盘按下事件

class _MapBase(object):
    """
    Notes:
        MapBase 用于可视化的地图创建
    Args:
        mapsize (tuple[int, int]): size of map
        figsize (tuple[int, int]): size of figure
    """

    def __init__(self, mapsize: Tuple[int, int], figsize: Tuple[int, int] = (12, 12)):
        super(_MapBase, self).__init__()
        # meta information
        self._mapsize: Tuple[int, int] = mapsize
        self._figsize: Tuple[int, int] = figsize

        # map, inf obstacle, 0 accessible, 1 in path
        self.map: np.ndarray = np.zeros(shape=mapsize)
        self.map_temp: np.ndarray = None

        # visualize core object
        self._figure, self._axes = plt.subplots(figsize=figsize)
        self._canvas = self._figure.canvas
        self._title: plt.Text = None
        self._go_on: bool = False

        # gui interaction, finite state machine to control plot
        self._on_press: bool = False
        self._current_state = "add_obstacle"
        self.btn_pressed_cid = self._canvas.mpl_connect("button_press_event", self._btn_press_cb)
        self.btn_released_cid = self._canvas.mpl_connect("button_release_event", self._btn_release_cb)
        self.mouse_move_cid = self._canvas.mpl_connect("motion_notify_event", self._move_cb)
        self.keyboard_cid = self._canvas.mpl_connect("key_press_event", self._keyboard_cb)

        self.new()

    def __del__(self):
        plt.close(self._figure)

    def start(self):
        assert False, f"This method should be overwritten by subclass"

    def update(self):
        self._canvas.draw_idle()

    def new(self) -> bool:
        """
        Notes:
            new is used to clear the map. In practice, generate a new axes
        Returns:
            if_clear (bool): true if successfully clear the map
        """
        try:
            self.map = np.zeros(shape=self._mapsize)
            self._axes.cla()
            self._axes.grid(True)
            self._axes.set_xticks(range(0, self._mapsize[0] + 1))
            self._axes.set_yticks(range(0, self._mapsize[1] + 1))
            self._axes.set_xlim(0, self._mapsize[0])
            self._axes.set_ylim(0, self._mapsize[1])
            self._title = self._axes.set_title(f"Current State: {self._current_state}")
            self._axes.scatter([], [], marker="s", c="red", label="goal")
            self._axes.scatter([], [], marker="s", c="black", label="obstacle")
            self._axes.scatter([], [], marker="s", c="green", label="start")
            self._axes.scatter([], [], marker="s", c="blue", label="final path")
            self._axes.scatter([], [], marker="s", c="gray", label="explored")
            self._axes.scatter([], [], marker="s", c="cyan", label="frontier")
            self._axes.legend(loc=(1.01, 0.85))
            self.update()
            return True
        except KeyboardInterrupt as e:
            print("Keyboard interrupted")
            return False

    def add_patch(self, x: Union[int, np.ndarray, list], y: Union[int, np.ndarray, list],
                  patches: Union[str, None] = None) -> bool:
        prompt = (x, y) if isinstance(x, int) else list(zip(x, y))
        if self._current_state == "add_obstacle":
            try:
                self.map[x, y] = np.inf
            except IndexError:
                pass
            self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="black")
            print(f"Add obstacle: {prompt}")
        elif self._current_state == "add_goal":
            if (self.map == -1).any():
                print(f"{Fore.YELLOW}Already have one goal!{Style.RESET_ALL}")
                return False
            self.map[x, y] = -1
            self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="red")
            print(f"Add goal: {prompt}")
        elif self._current_state == "add_start":
            if (self.map == -2).any():
                print(f"{Fore.YELLOW}Already have one start!{Style.RESET_ALL}")
                return False
            self.map[x, y] = -2
            self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="green")
            print(f"Add start: {prompt}")
        elif self._current_state == "going_back":
            self.map[x, y] = -3
            self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="blue")
        elif self._current_state == "clear_patch":
            if self.map[x, y] == 0:
                print(f"{Fore.YELLOW}Not in use!{Style.RESET_ALL}")
                return False
            self.map[x, y] = 0
            self._axes.scatter(x + 0.5, y + 0.5, s=250, marker="s", c="white")
            print(f"Cleared: {prompt}")
        elif self._current_state == "searching":
            if patches == "explored":
                # assert (0 <= self.map[x, y] <= 2).all(), f"{Fore.RED}Illegal Operation on map{Fore.RESET}"
                # self.map[x, y] = 1
                self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="gray", alpha=1)
            elif patches == "frontier":
                # assert (0 <= self.map[x, y] <= 2).all(), f"{Fore.RED}Illegal Operation on map{Fore.RESET}"
                # self.map[x, y] = 2
                self._axes.scatter(x + 0.5, y + 0.5, s=200, marker="s", c="cyan", alpha=1)
        else:
            assert False, F"{Fore.RED}Invalid State: {self._current_state}"
        self.update()
        return True

    def pandasfy(self) -> pd.DataFrame:
        """
        Notes:
            pandasfy will serialize current map from matplotlib to cvs file
        Returns:
            serialized_map (pd.Dataframe)
        """
        pp: pd.DataFrame = pd.DataFrame(self.map.copy().T[::-1, :], dtype=str)
        pp.replace("-1.0", "GOAL", inplace=True)
        pp.replace("-2.0", "START", inplace=True)
        return pp

    def depandasfy(self, csv_path: Path):
        """
        Notes:
            depandasfy will deserialized csv file map to matplotlib
        Args:
            csv_path (Path): path of csv file
        Returns:
            None
        """
        df: pd.DataFrame = pd.read_csv(filepath_or_buffer=csv_path, index_col=0, header=0)
        df.replace("START", "-2.0", inplace=True)
        df.replace("GOAL", "-1.0", inplace=True)
        self.new()
        t = df.to_numpy(dtype=np.float64)[::-1, :].T
        # self.map = df.to_numpy()
        xs, ys = np.where(t == np.inf)
        self._current_state = "add_obstacle"
        self.add_patch(xs, ys)
        xs, ys = np.where(t == -1)
        self._current_state = "add_goal"
        self.add_patch(xs, ys)
        xs, ys = np.where(t == -2)
        self._current_state = "add_start"
        self.add_patch(xs, ys)

    def _update_title(self, s: str = None):
        if s is None:
            self._title.set_text(f"Current State: {self._current_state}")
        else:
            self._title.set_text(s)
        self.update()

    def _btn_press_cb(self, event: bbase.MouseEvent) -> None:
        self._on_press = True
        if event.inaxes:
            point = (int(event.xdata), int(event.ydata))
            self.add_patch(*point)

    def _btn_release_cb(self, event: bbase.MouseEvent) -> None:
        self._on_press = False

    def _move_cb(self, event: bbase.MouseEvent) -> None:
        if self._on_press:
            if event.inaxes:
                point = (int(event.xdata), int(event.ydata))
                self.add_patch(*point)

    def _keyboard_cb(self, event: bbase.KeyEvent):
        if event.key == "c":
            self._current_state = "add_obstacle"
            print(f"{Fore.YELLOW}Clear all!{Style.RESET_ALL}")
            self.new()
        elif event.key == "1":
            self._current_state = "add_obstacle"
            print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}")
        elif event.key == "2":
            self._current_state = "add_goal"
            print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}")
        elif event.key == "3":
            self._current_state = "add_start"
            print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}")
        elif event.key == "0" or event.key == "d":
            self._current_state = "clear_patch"
            print(f"{Fore.GREEN}Switch to {self._current_state}{Style.RESET_ALL}")
        elif event.key == "r":
            p = self._current_state
            self._current_state = "add_obstacle"
            xs = np.random.randint(0, self._mapsize[0], size=(n := self._mapsize[0] * self._mapsize[1] // 5))
            ys = np.random.randint(0, self._mapsize[1], size=n)
            self.add_patch(xs, ys)
            self._current_state = p
        elif event.key == "w":
            files = [i.stem for i in Path(__file__).resolve().absolute().parent.glob("map_*.csv")]
            if len(files) > 0:
                name = f"map_{int(max([f.split('_')[1] for f in files])) + 1}"
            else:
                name = "map_0"
            self.pandasfy().to_csv(p := Path(__file__).resolve().parent.joinpath(f"{name}.csv"))
            print(f"{Fore.GREEN}Save current Map to {p}{Style.RESET_ALL}")
        elif event.key == "v":
            print(f"{Fore.GREEN}Start finding goal{self._current_state}{Style.RESET_ALL}")
            self._go_on = True
            self.start()
        self._update_title()

3. 算法实现

算法实现由Astar类负责完成,其中由于在计算启发值时可以有多种算法,因此将启发值的计算以静态方法形式嵌入到类中

start方法为完整的A*算法实现,而plot4searching用于在路径规划时动态更新地图

此外OpenList和CloseList分别作为字典的键和值蕴含在came_from和cost_so_far两个字典中,因此不用单独开辟新的数组进行保存

class Astar(_MapBase):
    def __init__(self, load_path: Union[Path, None], mapsize: Tuple[int, int] = (30, 30)):
        super(Astar, self).__init__(mapsize=mapsize)

        if load_path is not None:
            print(f"{Fore.GREEN}Load from {load_path}{Style.RESET_ALL}")
            self.depandasfy(load_path)
            self.update()
            t = np.where(self.map == -2), np.where(self.map == -1)
            self.start_point: str = f"{t[0][0][0]},{t[0][1][0]}"
            self.goal_point: str = f"{t[1][0][0]},{t[1][1][0]}"
            # open list, but I think frontier is a better name
            # no need for close list for which has been represented as items of came_from or keys of cost_so_far
            self.frontier = queue.PriorityQueue()
            self.frontier.put((0, self.start_point))
            self.came_from: Dict[str, Union[str, None]] = {self.start_point: None}
            self.cost_so_far: Dict[str, float] = {self.start_point: 0}
            self._last: np.ndarray = None
            plt.show()
        else:
            print(f"{Fore.GREEN}Start drawing map{Style.RESET_ALL}")
            plt.show()

    def start(self):
        """
        Notes:
            A* algorithm implementation
        Returns:
            None
        """
        if_can_find_goal: bool = False
        # get the closest point
        self._current_state = "searching"
        self._update_title()
        while not self.frontier.empty():
            current_p: str = self.frontier.get()[1]

            # find the goal
            if current_p == self.goal_point:
                if_can_find_goal = True
                break

            next_p: str
            for next_p in (n := self._get_neighbor(current_p)):
                new_cost = self.cost_so_far[current_p] + self._get_cost(next_p, current_p)
                # if next point has not been considered / next point in in undiscovered area
                # or there exists a nearer way to get to next point by passing current point
                # this also can help preventing going back
                if next_p not in self.cost_so_far.keys() or new_cost < self.cost_so_far[next_p]:
                    self.cost_so_far[next_p] = new_cost
                    priority = new_cost + self._get_cost(next_p, self.goal_point)
                    self.frontier.put((priority, next_p))
                    current_pp = [int(i) for i in current_p.split(",")]
                    self.came_from[next_p] = f"{current_pp[0]},{current_pp[1]}"
                    self._plot4searching()
                    plt.pause(0.0001)
                    self.update()
        if not if_can_find_goal:
            print(f"{Fore.RED}Cannot find the goal in current map{Style.RESET_ALL}")
            self._update_title("STOP: cannot find the path!")
            return
        else:
            # go back to find the path
            self._current_state = "add_goal"
            goal = (int(i) for i in self.goal_point.split(","))
            self.add_patch(*goal)
            self._current_state = "going_back"
            self._update_title()

            path = []
            current_p = self.goal_point
            while current_p != self.start_point:
                current_p = self.came_from[current_p]
                p = tuple(int(i) for i in current_p.split(","))
                path.append(p)
                self.add_patch(*p)
            self._current_state = "add_start"
            start = (int(i) for i in self.start_point.split(","))
            self.add_patch(*start)
        print(f"{Fore.BLUE}{Style.BRIGHT}Success!{Style.RESET_ALL}")
        print("Path:")
        pprint.pprint(path)
        return path

    def _get_neighbor(self, point: str):
        from itertools import product
        x, y = [int(i) for i in point.split(",")]
        neighbor = []
        for i, j in product([-1, 0, 1], [-1, 0, 1]):
            if abs(i) + abs(j) == 0:
                continue
            if 0 <= (dx := x + i) <= self._mapsize[0] and 0 <= (dy := y + j) <= self._mapsize[1] and \
                    self.map[dx, dy] != np.inf:
                neighbor.append(f"{dx},{dy}")
        return neighbor

    def _get_cost(self, point1: str, point2: str):
        # x, y = [int(i) for i in point.split(",")]
        # return self.manhattan_distance(*[int(i) for i in point1.split(",")],
        #                                *[int(i) for i in point2.split(",")])
        return self.euler_distance(*[int(i) for i in point1.split(",")],
                                   *[int(i) for i in point2.split(",")])

    def _plot4searching(self):
        """Codes for plotting"""
        # update current map
        # explore area
        explored_ps = ([], [])
        for p, parent in self.came_from.items():
            pp = [int(i) for i in p.split(",")]
            if parent is not None:
                explored_ps[0].append(pp[0])
                explored_ps[1].append(pp[1])
        self.map[np.array(explored_ps[0]), np.array((explored_ps[1]))] = 1

        # frontier
        frontier_ps = ([], [])
        t = []
        while not self.frontier.empty():
            t.append(self.frontier.get())
        for i in t:
            self.frontier.put(i)
        for tt in t:
            # frontier_ps.append(t.get()[1])
            p = [int(i) for i in tt[1].split(",")]
            frontier_ps[0].append(p[0])
            frontier_ps[1].append(p[1])
        # simply deepcopy locked object will raise an error
        # t = copy.deepcopy(self.frontier)
        self.map[np.array(frontier_ps[0]), np.array(frontier_ps[1])] = 2

        # find difference
        if self._last is None:
            update_explored = list(zip(*np.where(self.map == 1)))
            update_frontier = list(zip(*np.where(self.map == 2)))
            # update_frontier = [(i,j) for i, j in zip(*update_frontier)]
        else:
            last_explored = list(zip(*np.where(self._last == 1)))
            last_frontier = list(zip(*np.where(self._last == 2)))
            current_explored = list(zip(*np.where(self.map == 1)))
            current_frontier = list(zip(*np.where(self.map == 2)))
            update_explored = list(set(current_explored) - set(last_explored))
            update_frontier = list(set(current_frontier) - set(last_frontier))
        self._last = self.map.copy()
        # draw explored area
        if len(update_explored) > 0:
            self.add_patch(np.array([i[0] for i in update_explored]), np.array([i[1] for i in update_explored]),
                           patches="explored")
        # draw frontier
        if len(update_frontier) > 0:
            self.add_patch(np.array([i[0] for i in update_frontier]), np.array([i[1] for i in update_frontier]),
                           patches="frontier")

    @staticmethod
    def manhattan_distance(x1, y1, x2, y2):
        return abs(x2 - x1) + abs(y2 - y1)

    @staticmethod
    def euler_distance(x1, y1, x2, y2):
        if (d := abs(x2 - x1) + abs(y2 - y1)) == 1:
            return 1
        elif d == 2:
            return 1.4
        else:
            return round(np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2), ndigits=2)

4. 命令行参数解析

命令行参数解析由parse_arg函数负责完成,并返回Namespace对象

def parse_arg():
    parer = argparse.ArgumentParser(
        description=f"{Fore.GREEN}{Style.BRIGHT}A* Algorithm, animated with matplotlib{Style.RESET_ALL}. "
                    f"Author: {Fore.YELLOW}Jack Wang{Fore.RESET}. "
                    f"Date: {Fore.YELLOW}2021-12-15{Style.RESET_ALL}."
                    "This script provides A* algorithm illustration with the help of matplotlib\n"
                    f"You can use this script either by {Fore.GREEN}command-line interface{Fore.RESET} "
                    f"or {Fore.GREEN}python code interface{Fore.RESET}."
                    "To use this script with command-line interface, call this script with -h option. "
                    "To use this script in python code, "
                    "simply calling Astar class to create an Astar object, then everything will be done. "
                    "If load_path argument of Astar class are not specified (i.e. load_path=None), "
                    "this program will create a blank"
                    "map for you, else loading an existing map."
                    "\n\n"
                    "In the matplotlib GUI, press 1 to add obstacle, 2 to add goal, 3 to add start, w to save map, "
                    "r to random generate obstacles, d to delete an grid and v to start path planning. "
                    f"{Fore.YELLOW}Note: when call with -c, you can not press v to start path planning, -c is only "
                    f"for drawing map. {Style.RESET_ALL}")
    parer.add_argument("-o", dest="ok", action="store_true", help="must be added when calling the scripts to let me "
                                                                  "know you understand how to use this program")
    parer.add_argument("-c", dest="gen_map", action="store_true", help="create a new map")
    parer.add_argument("-l", dest="load_num", type=int, default=-1, help="load a existing map")
    return parer.parse_args()

5. 主程序

主程序中若不提供加载的地图路径则默认打开新的地图用于地图绘制

def main():
    args = parse_arg()
    if not args.ok:
        print(f"{Fore.RED}Please read the help info with -h option the rerun the scripts with -o option{Fore.RESET}")
        return
    if args.gen_map:
        Astar(load_path=None)
        return
    if args.load_num != -1:
        Astar(load_path=Path(__file__).resolve().parent.joinpath(f"map_{args.load_num}.csv"))


if __name__ == "__main__":
    # a = Astar(load_path=Path("./map_4.csv"))
    # a = Astar(load_path=Path(__file__).resolve().parent.joinpath("./map_1.csv"))
    main()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

A*算法-Python实现 的相关文章

  • 与区域指示符字符类匹配的 python 正则表达式

    我在 Mac 上使用 python 2 7 10 表情符号中的标志由一对表示区域指示符号 https en wikipedia org wiki Regional Indicator Symbol 我想编写一个 python 正则表达式来在
  • 如何用python脚本控制TP LINK路由器

    我想知道是否有一个工具可以让我连接到路由器并关闭它 然后从 python 脚本重新启动它 我知道如果我写 import os os system ssh l root 192 168 2 1 我可以通过 python 连接到我的路由器 但是
  • Python 中的舍入浮点问题

    我遇到了 np round np around 的问题 它没有正确舍入 我无法包含代码 因为当我手动设置值 而不是使用我的数据 时 返回有效 但这是输出 In 177 a Out 177 0 0099999998 In 178 np rou
  • 需要在python中找到print或printf的源代码[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我正在做一些我不能完全谈论的事情 我
  • 删除flask中的一对一关系

    我目前正在使用 Flask 开发一个应用程序 并且在删除一对一关系中的项目时遇到了一个大问题 我的模型中有以下结构 class User db Model tablename user user id db Column db String
  • 独立滚动矩阵的行

    我有一个矩阵 准确地说 是 2d numpy ndarray A np array 4 0 0 1 2 3 0 0 5 我想滚动每一行A根据另一个数组中的滚动值独立地 r np array 2 0 1 也就是说 我想这样做 print np
  • 如何使用 Pandas、Numpy 加速 Python 中的嵌套 for 循环逻辑?

    我想检查一下表的字段是否TestProject包含了Client端传入的参数 嵌套for循环很丑陋 有什么高效简单的方法来实现吗 非常感谢您的任何建议 def test parameter a list parameter b list g
  • Python 2:SMTPServerDisconnected:连接意外关闭

    我在用 Python 发送电子邮件时遇到一个小问题 me my email address you recipient s email address me email protected cdn cgi l email protectio
  • 从Python中的字典列表中查找特定值

    我的字典列表中有以下数据 data I versicolor 0 Sepal Length 7 9 I setosa 0 I virginica 1 I versicolor 0 I setosa 1 I virginica 0 Sepal
  • 在 Sphinx 文档中*仅*显示文档字符串?

    Sphinx有一个功能叫做automethod从方法的文档字符串中提取文档并将其嵌入到文档中 但它不仅嵌入了文档字符串 还嵌入了方法签名 名称 参数 我如何嵌入only文档字符串 不包括方法签名 ref http www sphinx do
  • 如何使用 pybrain 黑盒优化训练神经网络来处理监督数据集?

    我玩了一下 pybrain 了解如何生成具有自定义架构的神经网络 并使用反向传播算法将它们训练为监督数据集 然而 我对优化算法以及任务 学习代理和环境的概念感到困惑 例如 我将如何实现一个神经网络 例如 1 以使用 pybrain 遗传算法
  • 加快网络抓取速度

    我正在使用一个非常简单的网络抓取工具抓取 23770 个网页scrapy 我对 scrapy 甚至 python 都很陌生 但设法编写了一个可以完成这项工作的蜘蛛 然而 它确实很慢 爬行 23770 个页面大约需要 28 小时 我看过scr
  • Jupyter Notebook 找不到 Python 模块

    不知道发生了什么 但每当我使用 ipython 氢 原子 或 jupyter 笔记本时都找不到任何已安装的模块 我知道我安装了 pandas 但笔记本说找不到 我应该补充一点 当我正常运行脚本时 python script py 它确实导入
  • Python3 在 DirectX 游戏中移动鼠标

    我正在尝试构建一个在 DirectX 游戏中执行一些操作的脚本 除了移动鼠标之外 我一切都正常 是否有任何可用的模块可以移动鼠标 适用于 Windows python 3 Thanks I used pynput https pypi or
  • 从 NumPy ndarray 中选择行

    我只想从 a 中选择某些行NumPy http en wikipedia org wiki NumPy基于第二列中的值的数组 例如 此测试数组的第二列包含从 1 到 10 的整数 gt gt gt test numpy array nump
  • 如何使用原始 SQL 查询实现搜索功能

    我正在创建一个由 CS50 的网络系列指导的应用程序 这要求我仅使用原始 SQL 查询而不是 ORM 我正在尝试创建一个搜索功能 用户可以在其中查找存储在数据库中的书籍列表 我希望他们能够查询 书籍 表中的 ISBN 标题 作者列 目前 它
  • 模拟pytest中的异常终止

    我的多线程应用程序遇到了一个错误 主线程的任何异常终止 例如 未捕获的异常或某些信号 都会导致其他线程之一死锁 并阻止进程干净退出 我解决了这个问题 但我想添加一个测试来防止回归 但是 我不知道如何在 pytest 中模拟异常终止 如果我只
  • Django-tables2 列总计

    我正在尝试使用此总结列中的所有值文档 https github com bradleyayers django tables2 blob master docs pages column headers and footers rst 但页
  • 更改 Tk 标签小部件中单个单词的颜色

    我想更改 Tkinter 标签小部件中单个单词的字体颜色 我知道可以使用文本小部件来实现与我想要完成的类似的事情 例如使单词 YELLOW 显示为黄色 self text tag config tag yel fg clr yellow s
  • 使用随机放置的 NaN 创建示例 numpy 数组

    出于测试目的 我想创建一个M by Nnumpy 数组与c随机放置的 NaN import numpy as np M 10 N 5 c 15 A np random randn M N A mask np nan 我在创建时遇到问题mas

随机推荐

  • IDEA插件:智能代码生成器,附带注释和性能/安全检测功能

    x1f680 1 安装插件 在插件中搜索关键字 biot 点击安装 x1f680 2 代码生成 右侧的侧边栏点击biot后 在下方的输入框中输入你要问的内容 x1f680 3 biot AI 选中选区中的代码 点击鼠标右键让ai来帮你改代码
  • 安装Windows Server 2016 服务器 标准版

    注意事项 xff1a 安装带桌面版的 管理员密码设置 xff0c 要 注意大小写加数字 xff0c 不然会设置失败 安装文件下载 xff1a MSDN 我告诉你 PE U盘 微PE 服务器的驱动 xff0c 可以自己到对应服务器厂家的官网上
  • 第五节:基于Pytorch的相关可视化

    第五节 xff1a 基于Pytorch的相关可视化 在Pytorch发布后 xff0c 网络及训练过程的可视化工具也相应的被开发出来来帮助用户监督所建立的模型的结构和训练过程 本章将讲解HiddenLayer库 xff0c HiddenLa
  • 第六节:Pytorch实现全连接神经网络

    第六节 xff1a Pytorch实现全连接神经网络 前面的五节中 xff0c 我们讲解了使用PyTorch搭建一个神经网络中需要的需要各种技巧 xff0c 包括 xff1a 网络的搭建 选择不同的实践技巧 xff08 优化器选择 学习率下
  • 使用Visual Studio Code开发Arduino踩坑日记(持续更新)

    使用Visual Studio Code开发Arduino踩坑日记 持续更新 文章目录 使用Visual Studio Code开发Arduino踩坑日记 持续更新 1 在browse path中未找到包含文件问题描述问题分析解决思路解决过
  • 小白安装Ubuntu 18.04 LTS

    文章目录 小白安装Ubuntu 18 04 LTS作者 xff1a 王仕鸿日期 xff1a 2020 10 10 前言 xff08 可跳过 xff09 Ubuntu介绍操作系统介绍Ubuntu介绍 安装Ubuntu 18 04 LTS步骤一
  • 1_ROS基础

    ROS基础 本章讲解ROS中最基础的概念 不明白这些概念是没法学懂ROS的 学习了这些概念 后面我们将通过实操来在实践的过程中进一步体会 ROS是什么 ROS Robot Operating System 机器人操作系统 是一个提供一系列程
  • 2_ROS中的命令行工具

    ROS中的命令行工具 ROS中为我们提供了丰富的命令行工具 帮助我们进行代码的编写 调试 测试 框架的搭建 数据的显示等等 大图如下 所有的命令大致可以分为四类 分别是运行相关命令 编译相关命令 包制作管理相关命令 项目创建相关命令 下面进
  • 3_ROS创建工作空间和功能包

    3 ROS创建工作空间和功能包 前面我们讲解了ROS中的核心概念和使用ROS进行开发时候必须用到的命令行工具 下面我们就正式开始ROS中的开发 我们首先从创建工作空间和功能包开始 1 工作空间WorkSpace 工作空间是ROS中非常重要的
  • 4_Publieher的编程实现

    4 Publisher的编程实现 我们前面讲解了如何创建工作空间和功能包 但是我们都仅仅只创建了一个空的工作空间和功能包 什么都没有实现 我们想要进一步为功能包添加功能 就不可避免的需要添加Publisher和Subscriber 下面我们
  • 1.Latex介绍

    Latex介绍 本人鸿神 目前就读于XJTU 是一个即将开始科研的小白 既然做科研未来就无法避免发表论文 而发表论文就需要用到一系列的工具 Latex就是其中之一 谨以此文记录我的科研路 也希望Latex这一系列文章能够帮到各位 1 什么是
  • 2.Latex安装和TeXworks Editor基础

    二 Latex安装和TeXworks Editor使用教程 上一章我们讲解了什么是Latex和为什么我们要学习Latex 从这一章开始我们就要正式开始学习Latex 就像前面所讲的 Latex包含编译器和编辑器 我们需要在编辑器中编写夹杂代
  • 关于“ErrorFlash Download failed“Cortex-M3”的解决办法

    首先 xff0c 将仿真器连接电脑 xff0c 然后打开KEIL xff0c 点击FLash gt Erase xff0c 擦除Flash试一下 如果擦除不成功 xff0c 那么应该是的STM32的Flash被锁了 xff0c 要解锁一下
  • 3.Latex语法基础:命令与环境

    三 Latex语法基础 命令与环境 前面我们已经做好了开始编写Latex的一切准备工作 从这章开始 我们将开始讲解Latex语法 本章将讲解Latex语法的基础 命令与环境 1 命令与环境 命令 什么是命令 不同于其他编程语言 C C 43
  • Arduino多种传感器使用方法

    Arduino项目 智能窗户 前段时间参加了一个Arduino的比赛 具体内容就是用Arduino设计一个项目出来 我在的队伍的设计的项目就是智能窗户 智能窗户可以采集本地传感器采集到的环境参数 根据参数具有一套内部的逻辑判断 可以对温度
  • Paxos算法

    Paxos算法 Paxos算法是一系列共识算法中的一个 其目的就是为了解决共识 一致性问题 这个Github连接中详细的列出了多种共识算法 还有一些工程实践的例子 腾讯 Zookeeper Handpoo下的一个分布式框架 Handoop是
  • Python中的推导式

    推导式 Compression 推导式 Compression 是Python语言的一大特色 相比于其他语言而言 推导式使得Python能够编辑的进行循环 创建出特定的字典 列表等可迭代对象 使用推导式可以避免代码的冗长 简化代码风格 使得
  • A* 算法 Python实现

    A 算法 Python实现 最近考试周 没时间封装的很好 代码分两部分 定义和调用部分 定义部分 span class token keyword import span numpy span class token keyword as
  • Pytorch中使用TensorBoard

    本文记录了如何在Pytorch中使用Tensorboard xff08 主要是为了备忘 xff09 Pytorch中使用TensorBoard 虽然我本身就会用TensorBoard xff0c 但是因为TensorBoard只有在写训练代
  • A*算法-Python实现

    好久没有在CSDN上发文章了 xff0c 快一年了吧 这两天重新登录了一下 xff0c 不看不知道 xff0c 一看吓一跳 xff0c 没想到访问量快13万了 之前写博客的时候 xff0c 想着把一些有用的东西写下来 xff0c 一方面是当