1. 需求
想要使用Python实现一个出租车仿真环境,其中每个时间窗口内产生的request及其周围的taxi满足一个二分图的关系。原本计划request与taxi之间的匹配按照接客时间权值最小为目标进行匹配,但是后面实践的时候觉得存在一定问题:①单个目标的权值设置为订单出行距离/接客时间比仅用接客时间作为权值更好。②最小匹配+非完备匹配+存在不可连接点的约束使得出现很多bug,且没想到解决方案。于是后面觉得改用出行距离/接客时间作为权值实现二分图最优匹配。
2.KM算法
KM算法是解决带权值的二分匹配问题的经典算法。解决request和taxi的匹配问题首先需要先了解二分图以及二分图的最优匹配(KM算法)。
二分图的理解这里不做解释,仅提示区分好二分图的几个重要概念:
- 完备匹配:X或Y中全部匹配完
- 完美匹配:XY都匹配完
- 最佳匹配:权值最大的完备匹配(不是绝对的权值最大,因为限定了完备)。但可以增加一些权值为0的边使其统一起来
- 带权匹配:就是求出一个匹配集合,使得集合中边的权值之和最大或最小
- 相等子图:选择‘边权=顶标和’的边组成
- 匈牙利算法是利用了增广的性质,使整个图匹配数最大。KM算法利用匈牙利算法求完备匹配下的带权二分图最佳匹配的算法。
KM算法理解可参考:
KM算法入门
KM算法运用篇
KM算法详解
重要的操作和踩坑(提醒):
- 最小匹配的问题可以通过权值取反实现(当权值都为正数时)
- KM算法必须存在一个完备匹配,如果求一个最大权匹配(不一定完备),添加一些边权值为0的连接,实现最大权匹配。
- KM算法求是边权值和最大,若需边权之积最大,对每条边权取自然对数,然后求最大和权匹配,求得的结果a再算出e^a可得最大积匹配。
- 算法中每次是选择交错树中顶标和与边权差最小的边。(计算一个d=min{L(x) + L(y) - wight(x,y)}
- 设计算法按照不同的需求可能多次出现使用一个很大的数,如①代表二分图不可连接两点之间权值、②初始化的最小已寻找到的最小连接inc,③用大数-原权值求最小匹配。 的过程中注意区分几个很大的数之间的大小关系
KM算法实现
可以直接调用的函数
m = Munkres()
indexes = m.compute(matrix)
使用方法:
"""
@author:HY
@time:2020/12/3:19:15
"""
def cost(graph):
"""
计算成本函数(路径最小化)
:return:
"""
m = Munkres()
indexes = m.compute(graph)
print(indexes)
total = 0
for row, column in indexes:
value = graph[row][column]
total += value
print('最小成本%r:'%total)
def profit():
"""
计算利润的函数(路径最大化)
其实利用一个取相反数转化为成本的形式再计算的,得到对应的线最后再从原本的矩阵中取值。也可以使用大数-原权值的方法
:return:
"""
graph = [[5, 9, 1],
[10, 3, 2],
[8, 7, 4]]
cost_graph = []
for row in graph:
cost_row = []
for col in row:
cost_row += [- col]
cost_graph += [cost_row]
m = Munkres()
indexes = m.compute(cost_graph)
total = 0
for row, column in indexes:
value = graph[row][column]
total += value
print('最大利润%r:'%total)
内部逻辑完整实现
按照对算法的理解和部分资料的改编结合。按照左边的男生和右边的女生为二分图匹配案例设计,故代码中的left和boy是相等概念,right和girl是相等概念。
"""
@author:HY
@time:2020/12/3:19:15
"""
import numpy as np
from munkres import Munkres, print_matrix
import sys
class KM_Algorithm_1:
"""
此类是一个对KM算法的成功实现。
本需求中不可直接使用,因为订单与车辆之间可能不能匹配的边以及订单会匹配失败的可能性
"""
def __init__(self, Bipartite_Graph):
self.Bipartite_Graph = Bipartite_Graph
self.left = self.Bipartite_Graph.shape[0]
self.right = self.Bipartite_Graph.shape[1]
self.label_left = np.max(self.Bipartite_Graph, axis=1)
self.label_right = np.zeros(self.right)
self.visit_left = np.zeros(self.left, dtype=bool)
self.visit_right = np.zeros(self.right, dtype=bool)
self.match_right = np.empty(self.left) * np.nan
self.inc = 9999
def match(self, boy):
boy = int(boy)
self.visit_left[boy] = True
for girl in range(self.right):
if not self.visit_right[girl]:
gap = self.label_left[boy] + self.label_right[girl] - self.Bipartite_Graph[boy][girl]
if gap == 0:
self.visit_right[girl] = True
if np.isnan(self.match_right[girl]) or self.match(self.match_right[girl]):
self.match_right[girl] = boy
return 1
elif self.inc > gap:
self.inc = gap
return 0
def Kuh_Munkras(self):
self.match_right = np.empty(self.left) * np.nan
for man in range(self.left):
while True:
self.inc = 9999
self.reset()
if self.match(man):
break
for k in range(self.left):
if self.visit_left[k]:
self.label_left[k] -= self.inc
for n in range(self.right):
if self.visit_right[n]:
self.label_right[n] += self.inc
def calculateSum(self):
sum = 0
for i in range(self.left):
sum += self.Bipartite_Graph[int(self.match_right[i])][i]
return sum
def getResult(self):
return self.match_right
def reset(self):
self.visit_left = np.zeros(self.left, dtype=bool)
self.visit_right = np.zeros(self.right, dtype=bool)
def change_cost(matrix):
"""
与上面的类搭配使用计算成本
:param matrix:
:return:
"""
cost_matrix = []
for row in matrix:
cost_row = []
for col in row:
cost_row += [- col]
cost_matrix += [cost_row]
km = KM_Algorithm_1(Bipartite_Graph=np.array(cost_matrix))
km.Kuh_Munkras()
km.Bipartite_Graph = - km.Bipartite_Graph
print('最小权值:')
print(km.Bipartite_Graph)
print(km.calculateSum())
def test1():
"""
测试最大匹配
:return:
"""
n = input("输入行列数n:")
if n == '':
print("使用默认矩阵启动")
graph = [[5, 9, 1], [10, 3, 2], [8, 7, 4]]
km = KM_Algorithm_1(Bipartite_Graph=np.array(graph))
else:
n = int(n)
graph = [[0] * n] * n
print("输入一个n*n的二维数组,同一行的数字使用空格分隔,不同行的数字用回车换行:")
for i in range(n):
graph[i] = input().split(" ")
graph[i] = [int(j) for j in graph[i]]
print(graph)
km = KM_Algorithm_1(Bipartite_Graph=np.array(graph))
km.Kuh_Munkras()
print('最大权值:')
print(km.Bipartite_Graph)
print(km.calculateSum())
def test2():
"""
测试最小匹配
:return:
"""
graph = [[5, 9, 1], [10, 3, 2], [8, 7, 4]]
change_cost(graph)
if __name__ == '__main__':
test1()
3.订单匹配与KM算法结合
不同点:
- 与KM算法不同,仿真系统选择权值最优的情况匹配,允许request接单失败,taxi也可以不接客,即不是完备匹配。
- 当在一个较大的空间范围内收集request的时候,收集到的对应的taxies很多,在他们形成的二分图中,每个订单只与某几辆车存在连接关系,而与大部分taxi不存在连接关系。
- 以下两个匹配目标存在冲突,需要清晰自己的目标:
- 尽可能使更多request可以获得响应(类似最优匹配,但仅尽可能完备)
- 尽可能使得权值最优(带权匹配)
我的实现:
"""
@author:HY
@time:2020/12/6:21:35
"""
import numpy as np
class KM_Algorithm_4:
"""
这个版本是将权值变成订单除以时间,这样就是求最大匹配了,但需要注意的是,时间可能为0此时不能相除,需要另外处理
"""
def __init__(self, Bipartite_Graph):
self.Bipartite_Graph = Bipartite_Graph
self.left = self.Bipartite_Graph.shape[0]
self.right_true = self.Bipartite_Graph.shape[1]
self.right = self.Bipartite_Graph.shape[1] + self.left
self.reshape_graph()
self.label_left = np.max(self.Bipartite_Graph, axis=1)
self.label_right = np.zeros(self.right)
self.visit_left = np.zeros(self.left, dtype=bool)
self.visit_right = np.zeros(self.right, dtype=bool)
self.match_right = np.empty(self.right) * np.nan
self.inc = 1000*1000*1000
self.fail_boy = list()
def reshape_graph(self):
new = np.ones((self.left, self.left)) * 0
self.Bipartite_Graph = np.column_stack((self.Bipartite_Graph, new))
print(self.Bipartite_Graph)
def match(self, boy):
print("---------------我是boy%d----------------------" % boy)
boy = int(boy)
self.visit_left[boy] = True
for girl in range(self.right):
if not self.visit_right[girl] and self.Bipartite_Graph[boy][girl] >= 0:
gap = self.label_left[boy] + self.label_right[girl] - self.Bipartite_Graph[boy][girl]
if gap == 0:
self.visit_right[girl] = True
if np.isnan(self.match_right[girl]) or self.match(self.match_right[girl]):
self.match_right[girl] = boy
return 1
elif self.inc > gap:
self.inc = gap
return 0
def Kuh_Munkras(self):
self.match_right = np.empty(self.right) * np.nan
for man in range(self.left):
while True:
self.inc = 1000*1000
self.reset()
if self.match(man):
break
for k in range(self.left):
if self.visit_left[k]:
self.label_left[k] -= self.inc
for n in range(self.right):
if self.visit_right[n]:
self.label_right[n] += self.inc
return self.fail_boy
def calculateSum(self):
sum = 0
boys_girls = []
self.fail_boy = [i for i in range(self.left)]
for i in range(self.right_true):
if not np.isnan(self.match_right[i]):
sum += self.Bipartite_Graph[int(self.match_right[i])][i]
boy_girl = (int(self.match_right[i]), i)
boys_girls.append(boy_girl)
self.fail_boy.remove(int(self.match_right[i]))
print("最短路径:", sum)
return boys_girls, self.fail_boy
def getResult(self):
return self.match_right
def reset(self):
self.visit_left = np.zeros(self.left, dtype=bool)
self.visit_right = np.zeros(self.right, dtype=bool)
def test3():
graph = [[-1, -1, -1, 9, -1],
[4, -1, 4, -1, 4],
[-1, -1, -1, 6, -1],
[-1, -1, -1, 6, -1],
[-1, 5, -1, -1, -1],
[-1, -1, -1, 11, -1]]
print(graph)
km = KM_Algorithm_4(np.array(graph))
km.Kuh_Munkras()
boys_girls, fail_boys = km.calculateSum()
print("匹配男女列表", boys_girls)
print("失败男孩列表", fail_boys)
if __name__ == '__main__':
test3()
[未完全测试,存在bug]
[未完待续]
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)