import numpy as np import yaml class mTSP(object): ''' 用 Q-Learning 求解 TSP 问题 作者 Surfer Zen @ https://www.zhihu.com/people/surfer-zen ''' def __init__(self, params='params', num_cities=15, cities=None, num_cars=2, center_idx=[0], rectangles=None, alpha=1, beta=4, learning_rate=0.01, eps=0.1, ): ''' Args: num_cities (int): 城市数目 alpha (float): 一个超参,值越大,越优先探索最近的点 beta (float): 一个超参,值越大,越优先探索可能导向总距离最优的点 learning_rate (float): 学习率 eps (float): 探索率,值越大,探索性越强,但越难收敛 ''' np.random.seed(42) self.num_cities = num_cities self.cities = cities self.num_cars = num_cars self.center_idx = center_idx self.rectangles = rectangles self.alpha = alpha self.beta = beta self.eps = eps self.learning_rate = learning_rate self.distances = self.get_dist_matrix() self.mean_distance = self.distances.mean() self.qualities = np.zeros([num_cities, num_cities]) self.normalizers = np.zeros(num_cities) self.best_path = None self.best_path_length = np.inf with open(params+'.yml', 'r', encoding='utf-8') as file: params = yaml.safe_load(file) self.H = params['H'] self.W = params['W'] self.num_cars = params['num_cars'] self.flight_time_factor = params['flight_time_factor'] self.comp_time_factor = params['comp_time_factor'] self.trans_time_factor = params['trans_time_factor'] self.car_time_factor = params['car_time_factor'] self.bs_time_factor = params['bs_time_factor'] self.flight_energy_factor = params['flight_energy_factor'] self.comp_energy_factor = params['comp_energy_factor'] self.trans_energy_factor = params['trans_energy_factor'] self.battery_energy_capacity = params['battery_energy_capacity'] def get_dist_matrix(self): ''' 根据城市坐标,计算距离矩阵 ''' dist_matrix = np.zeros([self.num_cities, self.num_cities]) for i in range(self.num_cities): for j in range(self.num_cities): if i == j: continue xi, xj = self.cities[0, i], self.cities[0, j] yi, yj = self.cities[1, i], self.cities[1, j] dist_matrix[i, j] = np.sqrt((xi-xj)**2 + (yi-yj)**2) return dist_matrix def rollout(self, start_city_id=None): ''' 从区域中心出发,根据策略,在城市间游走,直到所有城市都走了一遍 ''' cities_visited = [] action_probs = [] current_city_id = start_city_id cities_visited.append(current_city_id) while len(cities_visited) < self.num_cities: current_city_id, action_prob = self.choose_next_city( cities_visited) cities_visited.append(current_city_id) action_probs.append(action_prob) cities_visited.append(cities_visited[0]) action_probs.append(1.0) path_length = self.calc_max_length(cities_visited) if path_length < self.best_path_length: self.best_path = cities_visited self.best_path_length = path_length rewards = self.calc_path_rewards(cities_visited, path_length) return cities_visited, action_probs, rewards def choose_next_city(self, cities_visited): ''' 根据策略选择下一个城市 ''' current_city_id = cities_visited[-1] # 对 quality 取指数,计算 softmax 概率用 probabilities = np.exp(self.qualities[current_city_id]) # 将已经走过的城市概率设置为零 for city_visited in cities_visited: probabilities[city_visited] = 0 # 计算 softmax 概率 probabilities = probabilities/probabilities.sum() if np.random.random() < self.eps: # 以 eps 概率按softmax概率密度进行随机采样 next_city_id = np.random.choice( range(len(probabilities)), p=probabilities) else: # 以 (1 - eps) 概率选择当前最优策略 next_city_id = probabilities.argmax() # 计算当前决策/action 的概率 if probabilities.argmax() == next_city_id: action_prob = probabilities[next_city_id]*self.eps + (1-self.eps) else: action_prob = probabilities[next_city_id]*self.eps return next_city_id, action_prob def calc_path_rewards(self, path, path_length): ''' 计算给定路径的奖励/rewards Args: path (list[int]): 路径,每个元素代表城市的 id path_length (float): 路径长路 Returns: rewards: 每一步的奖励,总距离以及当前这一步的距离越大,奖励越小 ''' rewards = [] for fr, to in zip(path[:-1], path[1:]): dist = self.distances[fr, to] reward = (self.mean_distance/path_length)**self.beta if dist == 0: reward = 1 else: reward = reward*(self.mean_distance/dist)**self.alpha rewards.append(np.log(reward)) return rewards def calc_max_length(self, path): ''' 多旅行商问题,计算最长的那个路径 ''' split_result = self.split_path(path) time_lt = [] for car_path in split_result: path_length = 0 flight_time = 0 bs_time = 0 for fr, to in zip(car_path[:-1], car_path[1:]): path_length += self.distances[fr, to] car_time = path_length * self.car_time_factor for offset_rec_idx in car_path[1:-1]: flight_time += self.rectangles[offset_rec_idx - 1]['flight_time'] bs_time += self.rectangles[offset_rec_idx - 1]['bs_time'] system_time = max(flight_time + car_time, bs_time) time_lt.append(system_time) return max(time_lt) def split_path(self, path): # 分割路径 split_indices = [i for i, x in enumerate(path) if x in self.center_idx] split_result = [] start = 0 for idx in split_indices: split_result.append(path[start:idx + 1]) # 包含分割值 start = idx # 从分割值开始 # 添加最后一部分 if start < len(path): split_result.append(path[start:]) return split_result def calc_updates_for_one_rollout(self, path, action_probs, rewards): ''' 对于给定的一次 rollout 的结果,计算其对应的 qualities 和 normalizers ''' qualities = [] normalizers = [] for fr, to, reward, action_prob in zip(path[:-1], path[1:], rewards, action_probs): log_action_probability = np.log(action_prob) qualities.append(- reward*log_action_probability) normalizers.append(- (reward + 1)*log_action_probability) return qualities, normalizers def update(self, path, new_qualities, new_normalizers): ''' 用渐近平均的思想,对 qualities 和 normalizers 进行更新 ''' lr = self.learning_rate for fr, to, new_quality, new_normalizer in zip( path[:-1], path[1:], new_qualities, new_normalizers): self.normalizers[fr] = ( 1-lr)*self.normalizers[fr] + lr*new_normalizer self.qualities[fr, to] = ( 1-lr)*self.qualities[fr, to] + lr*new_quality def train_for_one_rollout(self, start_city_id): ''' 对一次 rollout 的结果进行训练的流程 ''' path, action_probs, rewards = self.rollout(start_city_id=start_city_id) new_qualities, new_normalizers = self.calc_updates_for_one_rollout( path, action_probs, rewards) self.update(path, new_qualities, new_normalizers) def train(self, num_epochs=1000): ''' 总训练流程 ''' for epoch in range(num_epochs): self.train_for_one_rollout(start_city_id=0) return self.best_path_length, self.best_path def main(): np.random.seed(42) center = np.array([0, 0]) # cities: [[x1, x2, x3...], [y1, y2, y3...]] 城市坐标 cities = np.random.random([2, 15]) * np.array([800, 600]).reshape(2, -1) # cities = np.array([[10, -10], [0, 0]]) cities = np.column_stack((center, cities)) num_cars = 2 center_idx = [] for i in range(num_cars - 1): cities = np.column_stack((cities, center)) center_idx.append(cities.shape[1] - 1) tsp = mTSP(num_cities=cities.shape[1], cities=cities, num_cars=num_cars, center_idx=center_idx) # 训练模型 tsp.train(1000) # 输出最终路径 print(f"最优路径: {tsp.best_path}") print(f"路径长度: {tsp.best_path_length:.2f}") if __name__ == "__main__": main()