HPCC2025/mTSP_solver.py
weixin_46229132 84f69f4293 离散情况
2025-03-29 21:28:39 +08:00

261 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import yaml
class mTSP(object):
'''
用 Q-Learning 求解 TSP 问题
作者 Surfer Zen @ https://www.zhihu.com/people/surfer-zen
'''
def __init__(self,
params='params',
num_cities=15,
cities=None,
num_cars=2,
center_idx=[0],
rectangles=None,
alpha=1,
beta=4,
learning_rate=0.01,
eps=0.1,
):
'''
Args:
num_cities (int): 城市数目
alpha (float): 一个超参,值越大,越优先探索最近的点
beta (float): 一个超参,值越大,越优先探索可能导向总距离最优的点
learning_rate (float) 学习率
eps (float): 探索率,值越大,探索性越强,但越难收敛
'''
np.random.seed(42)
self.num_cities = num_cities
self.cities = cities
self.num_cars = num_cars
self.center_idx = center_idx
self.rectangles = rectangles
self.alpha = alpha
self.beta = beta
self.eps = eps
self.learning_rate = learning_rate
self.distances = self.get_dist_matrix()
self.mean_distance = self.distances.mean()
self.qualities = np.zeros([num_cities, num_cities])
self.normalizers = np.zeros(num_cities)
self.best_path = None
self.best_path_length = np.inf
with open(params+'.yml', 'r', encoding='utf-8') as file:
params = yaml.safe_load(file)
self.H = params['H']
self.W = params['W']
self.num_cars = params['num_cars']
self.flight_time_factor = params['flight_time_factor']
self.comp_time_factor = params['comp_time_factor']
self.trans_time_factor = params['trans_time_factor']
self.car_time_factor = params['car_time_factor']
self.bs_time_factor = params['bs_time_factor']
self.flight_energy_factor = params['flight_energy_factor']
self.comp_energy_factor = params['comp_energy_factor']
self.trans_energy_factor = params['trans_energy_factor']
self.battery_energy_capacity = params['battery_energy_capacity']
def get_dist_matrix(self):
'''
根据城市坐标,计算距离矩阵
'''
dist_matrix = np.zeros([self.num_cities, self.num_cities])
for i in range(self.num_cities):
for j in range(self.num_cities):
if i == j:
continue
xi, xj = self.cities[0, i], self.cities[0, j]
yi, yj = self.cities[1, i], self.cities[1, j]
dist_matrix[i, j] = np.sqrt((xi-xj)**2 + (yi-yj)**2)
return dist_matrix
def rollout(self, start_city_id=None):
'''
从区域中心出发,根据策略,在城市间游走,直到所有城市都走了一遍
'''
cities_visited = []
action_probs = []
current_city_id = start_city_id
cities_visited.append(current_city_id)
while len(cities_visited) < self.num_cities:
current_city_id, action_prob = self.choose_next_city(
cities_visited)
cities_visited.append(current_city_id)
action_probs.append(action_prob)
cities_visited.append(cities_visited[0])
action_probs.append(1.0)
path_length = self.calc_max_length(cities_visited)
if path_length < self.best_path_length:
self.best_path = cities_visited
self.best_path_length = path_length
rewards = self.calc_path_rewards(cities_visited, path_length)
return cities_visited, action_probs, rewards
def choose_next_city(self, cities_visited):
'''
根据策略选择下一个城市
'''
current_city_id = cities_visited[-1]
# 对 quality 取指数,计算 softmax 概率用
probabilities = np.exp(self.qualities[current_city_id])
# 将已经走过的城市概率设置为零
for city_visited in cities_visited:
probabilities[city_visited] = 0
# 计算 softmax 概率
probabilities = probabilities/probabilities.sum()
if np.random.random() < self.eps:
# 以 eps 概率按softmax概率密度进行随机采样
next_city_id = np.random.choice(
range(len(probabilities)), p=probabilities)
else:
# 以 (1 - eps) 概率选择当前最优策略
next_city_id = probabilities.argmax()
# 计算当前决策/action 的概率
if probabilities.argmax() == next_city_id:
action_prob = probabilities[next_city_id]*self.eps + (1-self.eps)
else:
action_prob = probabilities[next_city_id]*self.eps
return next_city_id, action_prob
def calc_path_rewards(self, path, path_length):
'''
计算给定路径的奖励/rewards
Args:
path (list[int]): 路径,每个元素代表城市的 id
path_length (float): 路径长路
Returns:
rewards: 每一步的奖励,总距离以及当前这一步的距离越大,奖励越小
'''
rewards = []
for fr, to in zip(path[:-1], path[1:]):
dist = self.distances[fr, to]
reward = (self.mean_distance/path_length)**self.beta
if dist == 0:
reward = 1
else:
reward = reward*(self.mean_distance/dist)**self.alpha
rewards.append(np.log(reward))
return rewards
def calc_max_length(self, path):
'''
多旅行商问题,计算最长的那个路径
'''
split_result = self.split_path(path)
time_lt = []
for car_path in split_result:
path_length = 0
flight_time = 0
bs_time = 0
for fr, to in zip(car_path[:-1], car_path[1:]):
path_length += self.distances[fr, to]
car_time = path_length * self.car_time_factor
for offset_rec_idx in car_path[1:-1]:
flight_time += self.rectangles[offset_rec_idx -
1]['flight_time']
bs_time += self.rectangles[offset_rec_idx - 1]['bs_time']
system_time = max(flight_time + car_time, bs_time)
time_lt.append(system_time)
return max(time_lt)
def split_path(self, path):
# 分割路径
split_indices = [i for i, x in enumerate(path) if x in self.center_idx]
split_result = []
start = 0
for idx in split_indices:
split_result.append(path[start:idx + 1]) # 包含分割值
start = idx # 从分割值开始
# 添加最后一部分
if start < len(path):
split_result.append(path[start:])
return split_result
def calc_updates_for_one_rollout(self, path, action_probs, rewards):
'''
对于给定的一次 rollout 的结果,计算其对应的 qualities 和 normalizers
'''
qualities = []
normalizers = []
for fr, to, reward, action_prob in zip(path[:-1], path[1:], rewards, action_probs):
log_action_probability = np.log(action_prob)
qualities.append(- reward*log_action_probability)
normalizers.append(- (reward + 1)*log_action_probability)
return qualities, normalizers
def update(self, path, new_qualities, new_normalizers):
'''
用渐近平均的思想,对 qualities 和 normalizers 进行更新
'''
lr = self.learning_rate
for fr, to, new_quality, new_normalizer in zip(
path[:-1], path[1:], new_qualities, new_normalizers):
self.normalizers[fr] = (
1-lr)*self.normalizers[fr] + lr*new_normalizer
self.qualities[fr, to] = (
1-lr)*self.qualities[fr, to] + lr*new_quality
def train_for_one_rollout(self, start_city_id):
'''
对一次 rollout 的结果进行训练的流程
'''
path, action_probs, rewards = self.rollout(start_city_id=start_city_id)
new_qualities, new_normalizers = self.calc_updates_for_one_rollout(
path, action_probs, rewards)
self.update(path, new_qualities, new_normalizers)
def train(self, num_epochs=1000):
'''
总训练流程
'''
for epoch in range(num_epochs):
self.train_for_one_rollout(start_city_id=0)
return self.best_path_length, self.best_path
def main():
np.random.seed(42)
center = np.array([0, 0])
# cities: [[x1, x2, x3...], [y1, y2, y3...]] 城市坐标
cities = np.random.random([2, 15]) * np.array([800, 600]).reshape(2, -1)
# cities = np.array([[10, -10], [0, 0]])
cities = np.column_stack((center, cities))
num_cars = 2
center_idx = []
for i in range(num_cars - 1):
cities = np.column_stack((cities, center))
center_idx.append(cities.shape[1] - 1)
tsp = mTSP(num_cities=cities.shape[1], cities=cities,
num_cars=num_cars, center_idx=center_idx)
# 训练模型
tsp.train(1000)
# 输出最终路径
print(f"最优路径: {tsp.best_path}")
print(f"路径长度: {tsp.best_path_length:.2f}")
if __name__ == "__main__":
main()