From 8d79e8cc666f579fdd880da2a138237259598c50 Mon Sep 17 00:00:00 2001 From: weixin_46229132 Date: Fri, 28 Mar 2025 19:57:44 +0800 Subject: [PATCH] =?UTF-8?q?mTSP=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Q_learning/{TSP.py => TSP_origin.py} | 2 + Q_learning/mTSP.py | 228 +++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) rename Q_learning/{TSP.py => TSP_origin.py} (99%) create mode 100644 Q_learning/mTSP.py diff --git a/Q_learning/TSP.py b/Q_learning/TSP_origin.py similarity index 99% rename from Q_learning/TSP.py rename to Q_learning/TSP_origin.py index 533ad02..db9f076 100644 --- a/Q_learning/TSP.py +++ b/Q_learning/TSP_origin.py @@ -98,6 +98,7 @@ class TSP(object): # 对 quality 取指数,计算 softmax 概率用 probabilities = np.exp(self.qualities[current_city_id]) + print(probabilities) # 将已经走过的城市概率设置为零 for city_visited in cities_visited: @@ -210,5 +211,6 @@ async def main(): print(f"路径长度: {tsp.best_path_length:.2f}") if __name__ == "__main__": + np.random.seed(42) # 使用asyncio.run()运行异步主函数 asyncio.run(main()) diff --git a/Q_learning/mTSP.py b/Q_learning/mTSP.py new file mode 100644 index 0000000..eb56a98 --- /dev/null +++ b/Q_learning/mTSP.py @@ -0,0 +1,228 @@ +import numpy as np + + +class TSP(object): + ''' + 用 Q-Learning 求解 TSP 问题 + 作者 Surfer Zen @ https://www.zhihu.com/people/surfer-zen + ''' + + def __init__(self, + num_cities=15, + cities=None, + num_cars=2, + center_idx=[0], + alpha=2, + beta=1, + learning_rate=0.001, + eps=0.1, + ): + ''' + Args: + num_cities (int): 城市数目 + alpha (float): 一个超参,值越大,越优先探索最近的点 + beta (float): 一个超参,值越大,越优先探索可能导向总距离最优的点 + learning_rate (float): 学习率 + eps (float): 探索率,值越大,探索性越强,但越难收敛 + ''' + self.num_cities = num_cities + self.cities = cities + self.num_cars = num_cars + self.center_idx = center_idx + + self.alpha = alpha + self.beta = beta + self.eps = eps + self.learning_rate = learning_rate + self.distances = self.get_dist_matrix() + self.mean_distance = self.distances.mean() + self.qualities = np.zeros([num_cities, num_cities]) + self.normalizers = np.zeros(num_cities) + self.best_path = None + self.best_path_length = np.inf + + def get_dist_matrix(self): + ''' + 根据城市坐标,计算距离矩阵 + ''' + dist_matrix = np.zeros([self.num_cities, self.num_cities]) + for i in range(self.num_cities): + for j in range(self.num_cities): + if i == j: + continue + xi, xj = self.cities[0, i], self.cities[0, j] + yi, yj = self.cities[1, i], self.cities[1, j] + dist_matrix[i, j] = np.sqrt((xi-xj)**2 + (yi-yj)**2) + return dist_matrix + + def rollout(self, start_city_id=None): + ''' + 从区域中心出发,根据策略,在城市间游走,直到所有城市都走了一遍 + ''' + cities_visited = [] + action_probs = [] + + current_city_id = start_city_id + cities_visited.append(current_city_id) + while len(cities_visited) < self.num_cities: + current_city_id, action_prob = self.choose_next_city( + cities_visited) + cities_visited.append(current_city_id) + action_probs.append(action_prob) + cities_visited.append(cities_visited[0]) + action_probs.append(1.0) + + path_length = self.calc_max_length(cities_visited) + if path_length < self.best_path_length: + self.best_path = cities_visited + self.best_path_length = path_length + rewards = self.calc_path_rewards(cities_visited, path_length) + return cities_visited, action_probs, rewards + + def choose_next_city(self, cities_visited): + ''' + 根据策略选择下一个城市 + ''' + current_city_id = cities_visited[-1] + + # 对 quality 取指数,计算 softmax 概率用 + probabilities = np.exp(self.qualities[current_city_id]) + + # 将已经走过的城市概率设置为零 + for city_visited in cities_visited: + probabilities[city_visited] = 0 + + # 计算 softmax 概率 + probabilities = probabilities/probabilities.sum() + + if np.random.random() < self.eps: + # 以 eps 概率按softmax概率密度进行随机采样 + next_city_id = np.random.choice( + range(len(probabilities)), p=probabilities) + else: + # 以 (1 - eps) 概率选择当前最优策略 + next_city_id = probabilities.argmax() + + # 计算当前决策/action 的概率 + if probabilities.argmax() == next_city_id: + action_prob = probabilities[next_city_id]*self.eps + (1-self.eps) + else: + action_prob = probabilities[next_city_id]*self.eps + + return next_city_id, action_prob + + def calc_path_rewards(self, path, path_length): + ''' + 计算给定路径的奖励/rewards + Args: + path (list[int]): 路径,每个元素代表城市的 id + path_length (float): 路径长路 + Returns: + rewards: 每一步的奖励,总距离以及当前这一步的距离越大,奖励越小 + ''' + rewards = [] + for fr, to in zip(path[:-1], path[1:]): + dist = self.distances[fr, to] + reward = (self.mean_distance/path_length)**self.beta + if dist == 0: + reward = 1 + else: + reward = reward*(self.mean_distance/dist)**self.alpha + rewards.append(np.log(reward)) + return rewards + + def calc_max_length(self, path): + ''' + 多旅行商问题,计算最长的那个路径 + ''' + split_result = self.split_path(path) + + length_lt = [] + for car_path in split_result: + path_length = 0 + for fr, to in zip(car_path[:-1], car_path[1:]): + path_length += self.distances[fr, to] + length_lt.append(path_length) + return max(length_lt) + + def split_path(self, path): + # 分割路径 + split_indices = [i for i, x in enumerate(path) if x in self.center_idx] + split_result = [] + start = 0 + for idx in split_indices: + split_result.append(path[start:idx + 1]) # 包含分割值 + start = idx # 从分割值开始 + # 添加最后一部分 + if start < len(path): + split_result.append(path[start:]) + return split_result + + def calc_updates_for_one_rollout(self, path, action_probs, rewards): + ''' + 对于给定的一次 rollout 的结果,计算其对应的 qualities 和 normalizers + ''' + qualities = [] + normalizers = [] + for fr, to, reward, action_prob in zip(path[:-1], path[1:], rewards, action_probs): + log_action_probability = np.log(action_prob) + qualities.append(- reward*log_action_probability) + normalizers.append(- (reward + 1)*log_action_probability) + return qualities, normalizers + + def update(self, path, new_qualities, new_normalizers): + ''' + 用渐近平均的思想,对 qualities 和 normalizers 进行更新 + ''' + lr = self.learning_rate + for fr, to, new_quality, new_normalizer in zip( + path[:-1], path[1:], new_qualities, new_normalizers): + self.normalizers[fr] = ( + 1-lr)*self.normalizers[fr] + lr*new_normalizer + self.qualities[fr, to] = ( + 1-lr)*self.qualities[fr, to] + lr*new_quality + + def train_for_one_rollout(self, start_city_id): + ''' + 对一次 rollout 的结果进行训练的流程 + ''' + path, action_probs, rewards = self.rollout(start_city_id=start_city_id) + new_qualities, new_normalizers = self.calc_updates_for_one_rollout( + path, action_probs, rewards) + self.update(path, new_qualities, new_normalizers) + + def train(self, num_epochs=1000): + ''' + 总训练流程 + ''' + for epoch in range(num_epochs): + self.train_for_one_rollout(start_city_id=0) + + +def main(): + np.random.seed(42) + center = np.array([0, 0]) + # cities: [[x1, x2, x3...], [y1, y2, y3...]] 城市坐标 + cites = np.random.random([2, 15]) * np.array([800, 600]).reshape(2, -1) + # cites = np.array([[10, -10], [0, 0]]) + cites = np.column_stack((center, cites)) + + num_cars = 2 + center_idx = [] + for i in range(num_cars - 1): + cites = np.column_stack((cites, center)) + center_idx.append(cites.shape[1] - 1) + + tsp = TSP(num_cities=cites.shape[1], cities=cites, + num_cars=num_cars, center_idx=center_idx) + + # 训练模型 + tsp.train(1000) + + # 输出最终路径 + print(f"最优路径: {tsp.best_path}") + print(f"路径长度: {tsp.best_path_length:.2f}") + + +if __name__ == "__main__": + main()