diff --git a/dqn.py b/DQN/dqn.py similarity index 60% rename from dqn.py rename to DQN/dqn.py index 343481d..4d90e89 100644 --- a/dqn.py +++ b/DQN/dqn.py @@ -40,54 +40,15 @@ class Agent: self.batch_size = 64 self.optimizer = optim.Adam(self.eval_net.parameters(), lr=self.learning_rate) - # 离散化动作空间 - self.v_cuts_actions = [1, 2, 3, 4, 5] # 垂直切割数选项 - self.h_cuts_actions = [1, 2, 3, 4, 5] # 水平切割数选项 - # self.rho_actions = [0.0, 0.2, 0.4, 0.6, 0.8, 1.0] # 卸载率选项 - - def discretize_action(self, q_values): - """将Q值转换为离散动作""" - action = [] - - # 分别为三个维度选择动作 - idx = 0 - # 垂直切割数 - v_cuts_q = q_values[idx:idx+len(self.v_cuts_actions)] - v_cuts_idx = torch.argmax(v_cuts_q).item() - action.append(self.v_cuts_actions[v_cuts_idx]) - idx += len(self.v_cuts_actions) - - # 水平切割数 - h_cuts_q = q_values[idx:idx+len(self.h_cuts_actions)] - h_cuts_idx = torch.argmax(h_cuts_q).item() - action.append(self.h_cuts_actions[h_cuts_idx]) - idx += len(self.h_cuts_actions) - - # # 卸载率 - # rho_q = q_values[idx:idx+len(self.rho_actions)] - # rho_idx = torch.argmax(rho_q).item() - # action.append(self.rho_actions[rho_idx]) - - return np.array(action) - - def get_action_dim(self): - """获取离散化后的动作空间维度""" - return (len(self.v_cuts_actions) + - len(self.h_cuts_actions)) - # len(self.rho_actions)) - def choose_action(self, state): if random.random() < self.epsilon: # 随机选择动作 - v_cuts = random.choice(self.v_cuts_actions) - h_cuts = random.choice(self.h_cuts_actions) - # rho = random.choice(self.rho_actions) - return np.array([v_cuts, h_cuts]) + return random.randint(0, self.action_dim - 1) else: # 根据Q值选择动作 state = torch.FloatTensor(state).unsqueeze(0) q_values = self.eval_net(state) - return self.discretize_action(q_values[0]) + return torch.argmax(q_values).item() def store_transition(self, state, action, reward, next_state, done): self.memory.append((state, action, reward, next_state, done)) @@ -99,13 +60,13 @@ class Agent: # 随机采样batch batch = random.sample(self.memory, self.batch_size) states = torch.FloatTensor([x[0] for x in batch]) - actions = torch.FloatTensor([x[1] for x in batch]) + actions = torch.LongTensor([x[1] for x in batch]) rewards = torch.FloatTensor([x[2] for x in batch]) next_states = torch.FloatTensor([x[3] for x in batch]) dones = torch.FloatTensor([x[4] for x in batch]) # 计算当前Q值 - current_q_values = self.eval_net(states) + current_q_values = self.eval_net(states).gather(1, actions.unsqueeze(1)) # 计算目标Q值 next_q_values = self.target_net(next_states).detach() @@ -113,7 +74,7 @@ class Agent: target_q_values = rewards + (1 - dones) * self.gamma * max_next_q # 计算损失 - loss = nn.MSELoss()(current_q_values.mean(), target_q_values.mean()) + loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values) # 更新网络 self.optimizer.zero_grad() @@ -126,6 +87,8 @@ class Agent: # 定期更新目标网络 if self.learn.counter % 100 == 0: self.target_net.load_state_dict(self.eval_net.state_dict()) + + self.learn.counter += 1 # 添加计数器属性 learn.counter = 0 diff --git a/env.py b/DQN/env.py similarity index 100% rename from env.py rename to DQN/env.py diff --git a/DQN/env_allocation.py b/DQN/env_allocation.py new file mode 100644 index 0000000..d18def4 --- /dev/null +++ b/DQN/env_allocation.py @@ -0,0 +1,140 @@ +import numpy as np +import gym +from gym import spaces + +class AllocationEnv(gym.Env): + """任务分配环境(第二层)""" + def __init__(self, subareas, num_systems): + super(AllocationEnv, self).__init__() + + self.subareas = subareas # 子区域列表 + self.num_systems = num_systems # 系统数量 + + # 时间系数 + self.flight_time_factor = 3 # 每张照片飞行时间 + self.comp_uav_factor = 5 # 无人机计算时间 + self.trans_time_factor = 0.3 # 传输时间 + self.car_move_time_factor = 100 # 汽车移动时间 + self.comp_bs_factor = 5 # 机巢计算时间 + + # 能量参数 + self.flight_energy_factor = 0.05 # 飞行能耗 + self.comp_energy_factor = 0.05 # 计算能耗 + self.trans_energy_factor = 0.0025 # 传输能耗 + self.battery_capacity = 30 # 电池容量 + + # 动作空间:每个子区域分配给哪个系统 + self.action_space = spaces.MultiDiscrete([num_systems] * len(subareas)) + + # 状态空间:[各系统当前负载] + self.observation_space = spaces.Box( + low=np.zeros(num_systems), + high=np.ones(num_systems) * float('inf'), + dtype=np.float32 + ) + + self.state = None + self.current_step = 0 + self.max_steps = 1000 + + def calculate_rho(self, area): + """计算最优卸载率""" + rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ + (self.comp_uav_factor - self.trans_time_factor) + rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * area - self.trans_energy_factor * area) / \ + (self.comp_energy_factor * area - self.trans_energy_factor * area) + if rho_energy_limit < 0: + return None + return min(rho_time_limit, rho_energy_limit) + + def step(self, action): + self.current_step += 1 + + # 初始化每个系统的任务列表 + system_tasks = {i: [] for i in range(self.num_systems)} + + # 根据动作分配任务 + for i, system_id in enumerate(action): + system_tasks[system_id].append(self.subareas[i]) + + # 计算每个系统的完成时间 + system_times = [] + valid_allocation = True + + for system_id, tasks in system_tasks.items(): + if not tasks: # 如果系统没有分配任务 + system_times.append(0) + continue + + # 调用第三层(路径规划)获取结果 + from env_routing import RoutingEnv + route_env = RoutingEnv(tasks) + completion_time, valid = route_env.optimize() + + if not valid: + valid_allocation = False + break + + system_times.append(completion_time) + + total_time = max(system_times) if system_times else 0 + + # 计算奖励 + if not valid_allocation: + reward = -10000 + done = True + else: + reward = -total_time + done = self.current_step >= self.max_steps + + # 更新状态(各系统的负载) + self.state = np.array([len(tasks) for tasks in system_tasks.values()]) + + return self.state, reward, done, {} + + def reset(self): + self.state = np.zeros(self.num_systems) + self.current_step = 0 + return self.state + + def render(self, mode='human'): + pass + + def optimize(self): + """使用DQN优化任务分配""" + from dqn import Agent + + state_dim = self.observation_space.shape[0] + action_dim = self.num_systems * len(self.subareas) + + agent = Agent(state_dim, action_dim) + + # 训练参数 + episodes = 100 # 减少训练轮数,因为这是子问题 + max_steps = 100 + + best_reward = float('-inf') + best_time = float('inf') + valid_solution = False + + for episode in range(episodes): + state = self.reset() + episode_reward = 0 + + for step in range(max_steps): + action = agent.choose_action(state) + next_state, reward, done, _ = self.step(action) + + agent.store_transition(state, action, reward, next_state, done) + agent.learn() + + episode_reward += reward + state = next_state + + if done: + if reward != -10000: # 如果是有效解 + valid_solution = True + best_time = min(best_time, -reward) + break + + return best_time, valid_solution diff --git a/DQN/env_partition.py b/DQN/env_partition.py new file mode 100644 index 0000000..faf7580 --- /dev/null +++ b/DQN/env_partition.py @@ -0,0 +1,88 @@ +import numpy as np +import gym +from gym import spaces + +class PartitionEnv(gym.Env): + """区域划分环境(第一层)""" + def __init__(self): + super(PartitionEnv, self).__init__() + + # 环境参数 + self.H = 20 # 区域高度 + self.W = 25 # 区域宽度 + self.k = 1 # 系统数量 + + # 动作空间:[垂直切割数, 水平切割数] + self.action_space = spaces.Box( + low=np.array([1, 1]), + high=np.array([5, 5]), + dtype=np.float32 + ) + + # 状态空间:[当前垂直切割数, 当前水平切割数, 当前最大完成时间] + self.observation_space = spaces.Box( + low=np.array([1, 1, 0]), + high=np.array([5, 5, float('inf')]), + dtype=np.float32 + ) + + self.state = None + self.current_step = 0 + self.max_steps = 1000 + + def generate_subareas(self, v_cuts, h_cuts): + """生成子区域信息""" + v_boundaries = np.linspace(0, self.H, v_cuts + 1) + h_boundaries = np.linspace(0, self.W, h_cuts + 1) + + subareas = [] + for i in range(len(v_boundaries) - 1): + for j in range(len(h_boundaries) - 1): + height = v_boundaries[i+1] - v_boundaries[i] + width = h_boundaries[j+1] - h_boundaries[j] + center_y = (v_boundaries[i] + v_boundaries[i+1]) / 2 + center_x = (h_boundaries[j] + h_boundaries[j+1]) / 2 + + subareas.append({ + 'height': height, + 'width': width, + 'area': height * width, + 'center': (center_y, center_x) + }) + return subareas + + def step(self, action): + self.current_step += 1 + + # 解析动作 + v_cuts = int(action[0]) # 垂直切割数 + h_cuts = int(action[1]) # 水平切割数 + + # 生成子区域 + subareas = self.generate_subareas(v_cuts, h_cuts) + + # 调用第二层(任务分配)获取结果 + from env_allocation import AllocationEnv + alloc_env = AllocationEnv(subareas, self.k) + total_time, valid = alloc_env.optimize() + + # 计算奖励 + if not valid: + reward = -10000 # 惩罚无效方案 + done = True + else: + reward = -total_time # 负的完成时间作为奖励 + done = self.current_step >= self.max_steps + + # 更新状态 + self.state = np.array([v_cuts, h_cuts, total_time]) + + return self.state, reward, done, {} + + def reset(self): + self.state = np.array([1, 1, 0]) + self.current_step = 0 + return self.state + + def render(self, mode='human'): + pass diff --git a/DQN/env_routing.py b/DQN/env_routing.py new file mode 100644 index 0000000..5cf881b --- /dev/null +++ b/DQN/env_routing.py @@ -0,0 +1,152 @@ +import numpy as np +import gym +from gym import spaces + +class RoutingEnv(gym.Env): + """路径规划环境(第三层)""" + def __init__(self, tasks): + super(RoutingEnv, self).__init__() + + self.tasks = tasks # 任务列表 + self.H = 20 # 区域高度 + self.W = 25 # 区域宽度 + self.region_center = (self.H/2, self.W/2) + + # 时间系数 + self.flight_time_factor = 3 # 每张照片飞行时间 + self.comp_uav_factor = 5 # 无人机计算时间 + self.trans_time_factor = 0.3 # 传输时间 + self.car_move_time_factor = 100 # 汽车移动时间 + self.comp_bs_factor = 5 # 机巢计算时间 + + # 动作空间:选择下一个要访问的任务索引 + self.action_space = spaces.Discrete(len(tasks)) + + # 状态空间:[当前位置x, 当前位置y, 未访问任务的mask] + self.observation_space = spaces.Box( + low=np.array([0, 0] + [0] * len(tasks)), + high=np.array([self.H, self.W] + [1] * len(tasks)), + dtype=np.float32 + ) + + self.state = None + self.current_position = self.region_center + self.unvisited_mask = np.ones(len(tasks)) + self.total_flight_time = 0 + + def calculate_task_time(self, task): + """计算单个任务的执行时间""" + area = task['area'] + + # 计算最优卸载率 + rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ + (self.comp_uav_factor - self.trans_time_factor) + rho_energy_limit = (30 - self.flight_time_factor * area - self.trans_time_factor * area) / \ + (self.comp_uav_factor * area - self.trans_time_factor * area) + if rho_energy_limit < 0: + return None, None + rho = min(rho_time_limit, rho_energy_limit) + + # 计算各阶段时间 + flight_time = self.flight_time_factor * area + comp_time = self.comp_uav_factor * rho * area + trans_time = self.trans_time_factor * (1 - rho) * area + comp_bs_time = self.comp_bs_factor * (1 - rho) * area + + task_time = max(flight_time, comp_bs_time) + return task_time, rho + + def calculate_move_time(self, from_pos, to_pos): + """计算移动时间""" + dist = np.sqrt((from_pos[0] - to_pos[0])**2 + (from_pos[1] - to_pos[1])**2) + return dist * self.car_move_time_factor + + def step(self, action): + # 检查动作是否有效 + if self.unvisited_mask[action] == 0: + return self.state, -10000, True, {} # 惩罚选择已访问的任务 + + # 获取选中的任务 + task = self.tasks[action] + task_center = task['center'] + + # 计算移动时间 + move_time = self.calculate_move_time(self.current_position, task_center) + + # 计算任务执行时间 + task_time, rho = self.calculate_task_time(task) + if task_time is None: # 任务不可行 + return self.state, -10000, True, {} + + # 更新状态 + self.current_position = task_center + self.unvisited_mask[action] = 0 + self.total_flight_time += task_time + + # 构建新状态 + self.state = np.concatenate([ + np.array(self.current_position), + self.unvisited_mask + ]) + + # 检查是否所有任务都已完成 + done = np.sum(self.unvisited_mask) == 0 + + # 计算奖励(负的总时间) + total_time = max(self.total_flight_time, move_time) + reward = -total_time if done else -move_time + + return self.state, reward, done, {} + + def reset(self): + self.current_position = self.region_center + self.unvisited_mask = np.ones(len(self.tasks)) + self.total_flight_time = 0 + + self.state = np.concatenate([ + np.array(self.current_position), + self.unvisited_mask + ]) + return self.state + + def render(self, mode='human'): + pass + + def optimize(self): + """使用DQN优化路径规划""" + from dqn import Agent + + state_dim = self.observation_space.shape[0] + action_dim = len(self.tasks) + + agent = Agent(state_dim, action_dim) + + # 训练参数 + episodes = 50 # 进一步减少训练轮数,因为这是最底层子问题 + max_steps = len(self.tasks) + 1 # 最多访问所有任务+返回 + + best_reward = float('-inf') + best_time = float('inf') + valid_solution = False + + for episode in range(episodes): + state = self.reset() + episode_reward = 0 + + for step in range(max_steps): + action = agent.choose_action(state) + next_state, reward, done, _ = self.step(action) + + agent.store_transition(state, action, reward, next_state, done) + agent.learn() + + episode_reward += reward + state = next_state + + if done: + if reward != -10000: # 如果是有效解 + valid_solution = True + best_time = min(best_time, -reward) + break + + return best_time, valid_solution diff --git a/run_dqn.py b/DQN/run_dqn.py similarity index 100% rename from run_dqn.py rename to DQN/run_dqn.py diff --git a/DQN/run_hierarchical.py b/DQN/run_hierarchical.py new file mode 100644 index 0000000..30663c6 --- /dev/null +++ b/DQN/run_hierarchical.py @@ -0,0 +1,118 @@ +from env_partition import PartitionEnv +from env_allocation import AllocationEnv +from env_routing import RoutingEnv +from dqn import Agent +import numpy as np +import matplotlib.pyplot as plt + +def train_hierarchical(): + """训练分层强化学习系统""" + # 创建第一层环境(区域划分) + partition_env = PartitionEnv() + partition_state_dim = partition_env.observation_space.shape[0] + partition_action_dim = 10 # 5个垂直切割选项 + 5个水平切割选项 + + partition_agent = Agent(partition_state_dim, partition_action_dim) + + # 训练参数 + episodes = 1000 + max_steps = 1000 + + # 记录训练过程 + rewards_history = [] + best_reward = float('-inf') + best_solution = None + + # 开始训练 + print("开始训练分层强化学习系统...") + + for episode in range(episodes): + state = partition_env.reset() + episode_reward = 0 + + for step in range(max_steps): + # 选择动作 + action = partition_agent.choose_action(state) + + # 执行动作(这会触发第二层和第三层的优化) + next_state, reward, done, _ = partition_env.step(action) + + # 存储经验 + partition_agent.store_transition(state, action, reward, next_state, done) + + # 学习 + partition_agent.learn() + + episode_reward += reward + state = next_state + + if done: + break + + # 记录每个episode的总奖励 + rewards_history.append(episode_reward) + + # 更新最佳解 + if episode_reward > best_reward: + best_reward = episode_reward + best_solution = { + 'vertical_cuts': int(action[0]), + 'horizontal_cuts': int(action[1]), + 'total_time': -reward if reward != -10000 else float('inf'), + 'episode': episode + } + + # 打印训练进度 + if (episode + 1) % 10 == 0: + avg_reward = np.mean(rewards_history[-10:]) + print(f"Episode {episode + 1}, Average Reward: {avg_reward:.2f}") + + return best_solution, rewards_history + +def plot_training_results(rewards_history): + plt.figure(figsize=(10, 5)) + plt.plot(rewards_history) + plt.title('Hierarchical DQN Training Progress') + plt.xlabel('Episode') + plt.ylabel('Total Reward') + plt.grid(True) + plt.show() + +def print_solution(solution): + print("\n最佳解决方案:") + print(f"在第 {solution['episode']} 轮找到") + print(f"垂直切割数: {solution['vertical_cuts']}") + print(f"水平切割数: {solution['horizontal_cuts']}") + print(f"总完成时间: {solution['total_time']:.2f} 秒") + +def visualize_partition(solution): + """可视化区域划分结果""" + H, W = 20, 25 + v_cuts = solution['vertical_cuts'] + h_cuts = solution['horizontal_cuts'] + + plt.figure(figsize=(10, 8)) + + # 绘制网格 + for i in range(v_cuts + 1): + y = i * (H / v_cuts) + plt.axhline(y=y, color='b', linestyle='-', alpha=0.5) + + for i in range(h_cuts + 1): + x = i * (W / h_cuts) + plt.axvline(x=x, color='b', linestyle='-', alpha=0.5) + + plt.title('Area Partition Visualization') + plt.xlabel('Width') + plt.ylabel('Height') + plt.grid(True, alpha=0.3) + plt.show() + +if __name__ == "__main__": + # 训练模型 + best_solution, rewards_history = train_hierarchical() + + # 显示结果 + plot_training_results(rewards_history) + print_solution(best_solution) + visualize_partition(best_solution) diff --git a/ga.py b/ga.py new file mode 100644 index 0000000..a9f8fdc --- /dev/null +++ b/ga.py @@ -0,0 +1,369 @@ +import math +import random + +import matplotlib.pyplot as plt +import numpy as np + +import plot_util + + +class GA(object): + def __init__(self, num_drones, num_city, num_total, data, to_process_idx, rectangles): + self.num_drones = num_drones + self.num_city = num_city + self.num_total = num_total + self.scores = [] + # self.iteration = iteration + self.location = data + self.to_process_idx = to_process_idx + self.rectangles = rectangles + self.epochs = 3000 + self.ga_choose_ratio = 0.2 + self.mutate_ratio = 0.05 + # fruits中存每一个个体是下标的list + self.dis_mat = self.compute_dis_mat(num_city, data) + # self.fruits = self.greedy_init(self.dis_mat, num_total, num_city) + self.fruits = self.random_init(num_total, num_city) + # 显示初始化后的最佳路径 + scores = self.compute_adp(self.fruits) + sort_index = np.argsort(-scores) + init_best = self.fruits[sort_index[0]] + init_best = self.location[init_best] + + + # 存储每个iteration的结果,画出收敛图 + self.iter_x = [0] + self.iter_y = [1.0 / scores[sort_index[0]]] + + def random_init(self, num_total, num_city): + tmp = [x for x in range(num_city)] + result = [] + for i in range(num_total): + random.shuffle(tmp) + result.append(tmp.copy()) + # print("Lens:", len(result), len(result[0])) + return result + + def greedy_init(self, dis_mat, num_total, num_city): + start_index = 0 + result = [] + for i in range(num_total): + rest = [x for x in range(0, num_city)] + # 所有起始点都已经生成了 + if start_index >= num_city: + start_index = np.random.randint(0, num_city) + result.append(result[start_index].copy()) + continue + current = start_index + rest.remove(current) + # 找到一条最近邻路径 + result_one = [current] + while len(rest) != 0: + tmp_min = math.inf + tmp_choose = -1 + for x in rest: + # print("---", current, x, dis_mat[current][x]) + if dis_mat[current][x] < tmp_min: + tmp_min = dis_mat[current][x] + tmp_choose = x + if tmp_choose == -1: # 此种情况仅可能发生在剩的都是基地点 + tmp_choose = rest[0] + # print("tmp_choose:", tmp_choose) + current = tmp_choose + result_one.append(tmp_choose) + # print(current, rest) + rest.remove(tmp_choose) + # print(rest) + result.append(result_one) + start_index += 1 + # print(len(result), len(result[0])) + return result + + # 计算不同城市之间的距离 + def compute_dis_mat(self, num_city, location): + dis_mat = np.zeros((num_city, num_city)) + for i in range(num_city): + for j in range(num_city): + if i == j: + dis_mat[i][j] = np.inf + continue + a = location[i] + b = location[j] + tmp = np.sqrt(sum([(x[0] - x[1]) ** 2 for x in zip(a, b)])) + dis_mat[i][j] = tmp + + for i in self.to_process_idx: + for j in self.to_process_idx: + # print("processing:", i, j, dis_mat[i][j]) + dis_mat[i][j] = np.inf + + return dis_mat + + # 计算路径长度 + def compute_pathlen(self, tmp_path, dis_mat): + path = tmp_path.copy() + if path[0] not in self.to_process_idx: + path.insert(0, 0) + + if path[-1] not in self.to_process_idx: + path.append(0) + + try: + a = path[0] + b = path[-1] + except: + import pdb + + pdb.set_trace() + + car_infos = [] + # 计算各系统的任务分配 + found_start_points_indices = [] + for i in range(len(path)): + if path[i] in self.to_process_idx: + found_start_points_indices.append(i) + car_paths = [] + for j in range(len(found_start_points_indices) - 1): + from_index = found_start_points_indices[j] + end_index = found_start_points_indices[j + 1] + car_path = [] + for k in range(from_index, end_index + 1): + car_path.append(path[k]) + car_paths.append(car_path) + + # 计算各系统的车辆移动距离 + for car_path in car_paths: + car_time = 0 + for i in range(len(car_path) - 1): + a = car_path[i] + b = car_path[i + 1] + if a in self.to_process_idx and b in self.to_process_idx: + car_time += 0 + else: + car_time += dis_mat[a][b] * 100 # TODO 这里要换成对应参数 + car_move_info = {'car_path': car_path, 'car_time': car_time} + car_infos.append(car_move_info) + + sorted_car_infos = sorted(car_infos, key=lambda x: x['car_time']) + + # 处理任务分配为num_drones + 1的情况,合并列表前两个元素 + if len(car_paths) == self.num_drones + 1: + first = sorted_car_infos[0] + second = sorted_car_infos[1] + merged_path = first['car_path'] + second['car_path'] + merged_time = first['car_time'] + second['car_time'] + merged_dict = {'car_path': merged_path, 'car_time': merged_time} + sorted_car_infos = [merged_dict] + sorted_car_infos[3:] + + # 计算各系统的总时间max(飞行时间+车的时间, 机巢计算时间) + T_k_list = [] + for car_info in sorted_car_infos: + flight_time = 0 + bs_time = 0 + for point in car_info['car_path']: + if point in self.to_process_idx: + continue + else: + flight_time += self.rectangles[point - 1]['flight_time'] # 注意,这里要减一!!! + bs_time += self.rectangles[point - 1]['comp_bs_time'] + system_time = max(flight_time + car_info['car_time'], bs_time) + T_k_list.append(system_time) + T_max = max(T_k_list) + + return T_max + + # 计算种群适应度 + def compute_adp(self, fruits): + adp = [] + for fruit in fruits: + if isinstance(fruit, int): + import pdb + + pdb.set_trace() + length = self.compute_pathlen(fruit, self.dis_mat) + adp.append(1.0 / length) + return np.array(adp) + + def swap_part(self, list1, list2): + index = len(list1) + list = list1 + list2 + list = list[::-1] + return list[:index], list[index:] + + def ga_cross(self, x, y): + len_ = len(x) + assert len(x) == len(y) + path_list = [t for t in range(len_)] + order = list(random.sample(path_list, 2)) + order.sort() + start, end = order + + # 找到冲突点并存下他们的下标,x中存储的是y中的下标,y中存储x与它冲突的下标 + tmp = x[start:end] + x_conflict_index = [] + for sub in tmp: + index = y.index(sub) + if not (index >= start and index < end): + x_conflict_index.append(index) + + y_confict_index = [] + tmp = y[start:end] + for sub in tmp: + index = x.index(sub) + if not (index >= start and index < end): + y_confict_index.append(index) + + assert len(x_conflict_index) == len(y_confict_index) + + # 交叉 + tmp = x[start:end].copy() + x[start:end] = y[start:end] + y[start:end] = tmp + + # 解决冲突 + for index in range(len(x_conflict_index)): + i = x_conflict_index[index] + j = y_confict_index[index] + y[i], x[j] = x[j], y[i] + + assert len(set(x)) == len_ and len(set(y)) == len_ + return list(x), list(y) + + def ga_parent(self, scores, ga_choose_ratio): + sort_index = np.argsort(-scores).copy() + sort_index = sort_index[0 : int(ga_choose_ratio * len(sort_index))] + parents = [] + parents_score = [] + for index in sort_index: + parents.append(self.fruits[index]) + parents_score.append(scores[index]) + return parents, parents_score + + def ga_choose(self, genes_score, genes_choose): + sum_score = sum(genes_score) + score_ratio = [sub * 1.0 / sum_score for sub in genes_score] + rand1 = np.random.rand() + rand2 = np.random.rand() + index1, index2 = 0, 0 + for i, sub in enumerate(score_ratio): + if rand1 >= 0: + rand1 -= sub + if rand1 < 0: + index1 = i + if rand2 >= 0: + rand2 -= sub + if rand2 < 0: + index2 = i + if rand1 < 0 and rand2 < 0: + break + return list(genes_choose[index1]), list(genes_choose[index2]) + + def ga_mutate(self, gene): + path_list = [t for t in range(len(gene))] + order = list(random.sample(path_list, 2)) + start, end = min(order), max(order) + tmp = gene[start:end] + # np.random.shuffle(tmp) + tmp = tmp[::-1] + gene[start:end] = tmp + return list(gene) + + def ga(self): + # 获得优质父代 + scores = self.compute_adp(self.fruits) + # 选择部分优秀个体作为父代候选集合 + parents, parents_score = self.ga_parent(scores, self.ga_choose_ratio) + tmp_best_one = parents[0] + tmp_best_score = parents_score[0] + # 新的种群fruits + fruits = parents.copy() + # 生成新的种群 + while len(fruits) < self.num_total: + # 轮盘赌方式对父代进行选择 + gene_x, gene_y = self.ga_choose(parents_score, parents) + # 交叉 + gene_x_new, gene_y_new = self.ga_cross(gene_x, gene_y) + # 变异 + if np.random.rand() < self.mutate_ratio: + gene_x_new = self.ga_mutate(gene_x_new) + if np.random.rand() < self.mutate_ratio: + gene_y_new = self.ga_mutate(gene_y_new) + x_adp = 1.0 / self.compute_pathlen(gene_x_new, self.dis_mat) + y_adp = 1.0 / self.compute_pathlen(gene_y_new, self.dis_mat) + # 将适应度高的放入种群中 + if x_adp > y_adp and (not gene_x_new in fruits): + fruits.append(gene_x_new) + elif x_adp <= y_adp and (not gene_y_new in fruits): + fruits.append(gene_y_new) + + self.fruits = fruits + + return tmp_best_one, tmp_best_score + + def run(self): + BEST_LIST = None + best_score = -math.inf + self.best_record = [] + early_stop_cnt = 0 + for i in range(self.epochs): + tmp_best_one, tmp_best_score = self.ga() + self.iter_x.append(i) + self.iter_y.append(1.0 / tmp_best_score) + if tmp_best_score > best_score: + best_score = tmp_best_score + BEST_LIST = tmp_best_one + early_stop_cnt = 0 + else: + early_stop_cnt += 1 + if early_stop_cnt == 50: # 若连续50次没有性能提升,则早停 + break + self.best_record.append(1.0 / best_score) + best_length = 1.0 / best_score + # print(f"Epoch {i:3}: {best_length:.3f}") + # print(1.0 / best_score) + return tmp_best_one, 1.0 / best_score + +if __name__ == '__main__': + seed = 42 + num_drones = 6 + num_city = 12 + epochs = 3000 + + # 固定随机数 + np.random.seed(seed) + random.seed(seed) + + + ## 初始化坐标 (第一个点是基地的起点,起点的坐标是 0,0 ) + data = [[0, 0]] + for i in range(num_city - 1): + while True: + x = np.random.randint(-250, 250) + y = np.random.randint(-250, 250) + if x != 0 or y != 0: + break + data.append([x, y]) + + data = np.array(data) + + # 关键:有N架无人机,则再增加N-1个`点` (坐标是起始点),这些点之间的距离是inf + for d in range(num_drones - 1): + data = np.vstack([data, data[0]]) + num_city += 1 # 增加欺骗城市 + + to_process_idx = [0] + # print("start point:", location[0]) + for d in range(1, num_drones): # 1, ... drone-1 + # print("added base point:", location[num_city - d]) + to_process_idx.append(num_city - d) + + model = GA(num_city=data.shape[0], num_total=20, data=data.copy()) + Best_path, Best = model.run() + print(Best_path) + iterations = model.iter_x + best_record = model.iter_y + + # print(Best_path) + + print(f"Best Path Length: {Best:.3f}") + plot_util.plot_results(Best_path, iterations, best_record) diff --git a/hybrid_solver.py b/hybrid_solver.py new file mode 100644 index 0000000..55e89b2 --- /dev/null +++ b/hybrid_solver.py @@ -0,0 +1,172 @@ +import random +import math +import matplotlib.pyplot as plt +import matplotlib.patches as patches +import numpy as np +from ga import GA +import plot_util + +# 固定随机种子,便于复现 +random.seed(42) + +# --------------------------- +# 参数设置 +# --------------------------- +H = 20 # 区域高度,网格点之间的距离为25m(单位距离) +W = 25 # 区域宽度 +k = 1 # 系统数量(车-巢-机系统个数) +num_iterations = 10000 # 蒙特卡洛模拟迭代次数 + +# 时间系数(单位:秒,每个网格一张照片) +flight_time_factor = 3 # 每张照片对应的飞行时间,无人机飞行速度为9.5m/s,拍摄照片的时间间隔为3s +comp_uav_factor = 5 # 无人机上每张照片计算时间,5s +trans_time_factor = 0.3 # 每张照片传输时间,0.3s +car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间,2s,加了一个放大因子 +comp_bs_factor = 5 # 机巢上每张照片计算时间 + +# 其他参数 +flight_energy_factor = 0.05 # 单位:分钟/张 +comp_energy_factor = 0.05 # 计算能耗需要重新估计 +trans_energy_factor = 0.0025 +battery_capacity = 10 # 无人机只进行飞行,续航为30分钟 +# bs_energy_factor = +# car_energy_factor = + +# --------------------------- +# 蒙特卡洛模拟,寻找最佳方案 +# --------------------------- +best_T = float('inf') +best_solution = None + +for iteration in range(num_iterations): + # 随机生成分区的行分段数与列分段数 + R = random.randint(1, 5) # 行分段数 + C = random.randint(1, 5) # 列分段数 + + # 生成随机的行、列分割边界 + row_boundaries = sorted(random.sample(range(1, H), R - 1)) + row_boundaries = [0] + row_boundaries + [H] + col_boundaries = sorted(random.sample(range(1, W), C - 1)) + col_boundaries = [0] + col_boundaries + [W] + + # --------------------------- + # 根据分割边界生成所有矩形任务 + # --------------------------- + rectangles = [] + valid_partition = True # 标记此分区是否满足所有约束 + for i in range(len(row_boundaries) - 1): + for j in range(len(col_boundaries) - 1): + r1 = row_boundaries[i] + r2 = row_boundaries[i + 1] + c1 = col_boundaries[j] + c2 = col_boundaries[j + 1] + d = (r2 - r1) * (c2 - c1) # 任务的照片数量(矩形面积) + + # 求解rho + rho_time_limit = (flight_time_factor - trans_time_factor) / \ + (comp_uav_factor - trans_time_factor) + rho_energy_limit = (battery_capacity - flight_energy_factor * d - trans_energy_factor * d) / \ + (comp_energy_factor * d - trans_energy_factor * d) + if rho_energy_limit < 0: + valid_partition = False + break + rho = min(rho_time_limit, rho_energy_limit) + # print(rho) + flight_time = flight_time_factor * d + comp_time = comp_uav_factor * rho * d + trans_time = trans_time_factor * (1 - rho) * d + comp_bs_time = comp_bs_factor * (1 - rho) * d + + # 计算任务矩形中心,用于后续车辆移动时间计算 + center_r = (r1 + r2) / 2.0 + center_c = (c1 + c2) / 2.0 + + rectangles.append({ + 'r1': r1, 'r2': r2, 'c1': c1, 'c2': c2, + 'd': d, + 'rho': rho, + 'flight_time': flight_time, + 'comp_time': comp_time, + 'trans_time': trans_time, + 'comp_bs_time': comp_bs_time, + 'center': [center_r, center_c] + }) + if not valid_partition: + break + + # 如果有中存在任务不满足电池约束,则跳过这种分区 + if not valid_partition: + continue + + # --------------------------- + # 使用遗传算法,寻找最佳方案 + # --------------------------- + num_city = len(rectangles) + 1 # 划分好的区域中心点+整个区域的中心 + epochs = 3000 + + # 初始化坐标 (第一个点是整个区域的中心) + center_data = [[H / 2.0, W / 2.0]] + for rec in rectangles: + center_data.append(rec['center']) + center_data = np.array(center_data) + + # 关键:有k架无人机,则再增加N-1个`点` (坐标是起始点),这些点之间的距离是inf + for d in range(k - 1): + center_data = np.vstack([center_data, center_data[0]]) + num_city += 1 # 增加欺骗城市 + + to_process_idx = [0] + # print("start point:", location[0]) + for d in range(1, k): # 1, ... drone-1 + # print("added base point:", location[num_city - d]) + to_process_idx.append(num_city - d) + + model = GA(num_drones=k, num_city=center_data.shape[0], num_total=20, data=center_data.copy(), to_process_idx=to_process_idx, rectangles=rectangles) + Best_path, Best = model.run() + + # 根据最佳路径计算各系统任务分配 + if Best_path[0] not in to_process_idx: + Best_path.insert(0, 0) + + if Best_path[-1] not in to_process_idx: + Best_path.append(0) + + # 计算各系统的任务分配 + system_tasks = [] + found_start_points_indices = [] + for i in range(len(Best_path)): + if Best_path[i] in to_process_idx: + found_start_points_indices.append(i) + for j in range(len(found_start_points_indices) - 1): + from_index = found_start_points_indices[j] + end_index = found_start_points_indices[j + 1] + system_task = [] + for k in range(from_index, end_index + 1): + if Best_path[k] not in to_process_idx: + system_task.append(rectangles[Best_path[k] - 1]) + system_tasks.append(system_task) + + if Best < best_T: + best_T = Best + best_solution = { + 'system_tasks': system_tasks, + # 'T_k_list': T_k_list, + 'best_T': best_T, + 'iteration': iteration, + 'R': R, + 'C': C, + 'row_boundaries': row_boundaries, + 'col_boundaries': col_boundaries + } + +# --------------------------- +# 输出最佳方案 +# --------------------------- +if best_solution is not None: + print("最佳 T (各系统中最长的完成时间):", best_solution['best_T']) + for i in range(k): + num_tasks = len(best_solution['system_tasks'][i]) + print( + f"系统 {i}: 飞行任务数量: {num_tasks}") +else: + print("在给定的模拟次数内未找到满足所有约束的方案。") \ No newline at end of file diff --git a/mtkl_sovler.py b/mtkl_sovler.py new file mode 100644 index 0000000..a02d587 --- /dev/null +++ b/mtkl_sovler.py @@ -0,0 +1,306 @@ +import random +import math +import matplotlib.pyplot as plt +import matplotlib.patches as patches + +# 固定随机种子,便于复现 +random.seed(42) + +# --------------------------- +# 参数设置 +# --------------------------- +H = 20 # 区域高度,网格点之间的距离为25m(单位距离) +W = 25 # 区域宽度 +k = 1 # 系统数量(车-巢-机系统个数) +num_iterations = 1000000 # 蒙特卡洛模拟迭代次数 + +# 时间系数(单位:秒,每个网格一张照片) +flight_time_factor = 3 # 每张照片对应的飞行时间,无人机飞行速度为9.5m/s,拍摄照片的时间间隔为3s +comp_uav_factor = 5 # 无人机上每张照片计算时间,5s +trans_time_factor = 0.3 # 每张照片传输时间,0.3s +car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间,2s,加了一个放大因子 +comp_bs_factor = 5 # 机巢上每张照片计算时间 + +# 其他参数 +flight_energy_factor = 0.05 # 单位:分钟/张 +comp_energy_factor = 0.05 # 计算能耗需要重新估计 +trans_energy_factor = 0.0025 +battery_capacity = 10 # 无人机只进行飞行,续航为30分钟 +# bs_energy_factor = +# car_energy_factor = + +# --------------------------- +# 蒙特卡洛模拟,寻找最佳方案 +# --------------------------- +best_T = float('inf') +best_solution = None + +for iteration in range(num_iterations): + # 随机生成分区的行分段数与列分段数 + R = random.randint(1, 5) # 行分段数 + C = random.randint(1, 5) # 列分段数 + + # 生成随机的行、列分割边界 + row_boundaries = sorted(random.sample(range(1, H), R - 1)) + row_boundaries = [0] + row_boundaries + [H] + col_boundaries = sorted(random.sample(range(1, W), C - 1)) + col_boundaries = [0] + col_boundaries + [W] + + # --------------------------- + # 根据分割边界生成所有矩形任务 + # --------------------------- + rectangles = [] + valid_partition = True # 标记此分区是否满足所有约束 + for i in range(len(row_boundaries) - 1): + for j in range(len(col_boundaries) - 1): + r1 = row_boundaries[i] + r2 = row_boundaries[i + 1] + c1 = col_boundaries[j] + c2 = col_boundaries[j + 1] + d = (r2 - r1) * (c2 - c1) # 任务的照片数量(矩形面积) + + # # 每个任务随机生成卸载比率 ρ ∈ [0,1] + # rho = random.random() + # # rho = 0.1 + + # # 计算各个阶段时间 + # flight_time = flight_time_factor * d + # comp_time = comp_uav_factor * rho * d + # trans_time = trans_time_factor * (1 - rho) * d + # comp_bs_time = comp_bs_factor * (1 - rho) * d + + # # 检查无人机电池约束: + # # 飞行+计算+传输能耗需不超过电池容量 + # flight_energy = flight_energy_factor * d + # comp_energy = comp_energy_factor * rho * d + # trans_energy = trans_energy_factor * (1 - rho) * d + # total_uav_energy = flight_energy + comp_energy + trans_energy + # # 无人机计算与传输时间不超过飞行时间 + # if (total_uav_energy > battery_capacity) or (comp_time + trans_time > flight_time): + # # TODO 时间约束的rho上界是个常数0.57,如果区域划分定了,rho直接取上界即可,可以数学证明 + # valid_partition = False + # break + + # 求解rho + rho_time_limit = (flight_time_factor - trans_time_factor) / \ + (comp_uav_factor - trans_time_factor) + rho_energy_limit = (battery_capacity - flight_energy_factor * d - trans_energy_factor * d) / \ + (comp_energy_factor * d - trans_energy_factor * d) + if rho_energy_limit < 0: + valid_partition = False + break + rho = min(rho_time_limit, rho_energy_limit) + print(rho) + flight_time = flight_time_factor * d + comp_time = comp_uav_factor * rho * d + trans_time = trans_time_factor * (1 - rho) * d + comp_bs_time = comp_bs_factor * (1 - rho) * d + + # 计算任务矩形中心,用于后续车辆移动时间计算 + center_r = (r1 + r2) / 2.0 + center_c = (c1 + c2) / 2.0 + + rectangles.append({ + 'r1': r1, 'r2': r2, 'c1': c1, 'c2': c2, + 'd': d, + 'rho': rho, + 'flight_time': flight_time, + 'comp_time': comp_time, + 'trans_time': trans_time, + 'comp_bs_time': comp_bs_time, + 'center': (center_r, center_c) + }) + if not valid_partition: + break + + # 如果分区中存在任务不满足电池约束,则跳过该分区 + if not valid_partition: + continue + + # --------------------------- + # 随机将所有矩形任务分配给 k 个系统(车-机-巢) + # --------------------------- + system_tasks = {i: [] for i in range(k)} + for rect in rectangles: + system = random.randint(0, k - 1) + system_tasks[system].append(rect) + + # --------------------------- + # 对于每个系统,计算该系统的总完成时间 T_k: + # T_k = 所有任务的飞行时间之和 + 车辆的移动时间 + # 车辆移动时间:车辆从区域中心出发,依次经过各任务中心(顺序采用距离区域中心的启发式排序) + # --------------------------- + region_center = (H / 2.0, W / 2.0) + T_k_list = [] + for i in range(k): + tasks = system_tasks[i] + tasks.sort(key=lambda r: math.hypot(r['center'][0] - region_center[0], + r['center'][1] - region_center[1])) + total_flight_time = sum(task['flight_time'] for task in tasks) + if tasks: + # 车辆从区域中心到第一个任务中心 + car_time = math.hypot(tasks[0]['center'][0] - region_center[0], + tasks[0]['center'][1] - region_center[1]) * car_move_time_factor + # 依次经过任务中心 + for j in range(1, len(tasks)): + prev_center = tasks[j - 1]['center'] + curr_center = tasks[j]['center'] + car_time += math.hypot(curr_center[0] - prev_center[0], + curr_center[1] - prev_center[1]) * car_move_time_factor + else: + car_time = 0 + + # 机巢的计算时间 + total_bs_time = sum(task['comp_bs_time'] for task in tasks) + T_k = max(total_flight_time + car_time, total_bs_time) + T_k_list.append(T_k) + + T_max = max(T_k_list) # 整体目标 T 为各系统中最大的 T_k + + # TODO 没有限制系统的总能耗 + + if T_max < best_T: + best_T = T_max + best_solution = { + 'system_tasks': system_tasks, + 'T_k_list': T_k_list, + 'T_max': T_max, + 'iteration': iteration, + 'R': R, + 'C': C, + 'row_boundaries': row_boundaries, + 'col_boundaries': col_boundaries + } + +# --------------------------- +# 输出最佳方案 +# --------------------------- +if best_solution is not None: + print("最佳 T (各系统中最长的完成时间):", best_solution['T_max']) + for i in range(k): + num_tasks = len(best_solution['system_tasks'][i]) + print( + f"系统 {i}: 完成时间 T = {best_solution['T_k_list'][i]}, 飞行任务数量: {num_tasks}") +else: + print("在给定的模拟次数内未找到满足所有约束的方案。") + +# 在输出最佳方案后添加详细信息 +if best_solution is not None: + print("\n各系统详细信息:") + region_center = (H / 2.0, W / 2.0) + + for system_id, tasks in best_solution['system_tasks'].items(): + print(f"\n系统 {system_id} 的任务详情:") + + # 按距离区域中心的距离排序任务 + tasks_sorted = sorted(tasks, key=lambda r: math.hypot(r['center'][0] - region_center[0], + r['center'][1] - region_center[1])) + + if tasks_sorted: + print( + f"轨迹路线: 区域中心({region_center[0]:.1f}, {region_center[1]:.1f})", end="") + current_pos = region_center + total_car_time = 0 + total_flight_time = 0 + total_flight_energy = 0 + total_comp_energy = 0 + total_trans_energy = 0 + + for i, task in enumerate(tasks_sorted, 1): + # 计算车辆移动时间 + car_time = math.hypot(task['center'][0] - current_pos[0], + task['center'][1] - current_pos[1]) * car_move_time_factor + total_car_time += car_time + + # 更新当前位置 + current_pos = task['center'] + print( + f" -> 任务{i}({current_pos[0]:.1f}, {current_pos[1]:.1f})", end="") + + # 累加各项数据 + total_flight_time += task['flight_time'] + total_flight_energy += flight_energy_factor * task['d'] + total_comp_energy += comp_energy_factor * \ + task['rho'] * task['d'] + total_trans_energy += trans_energy_factor * \ + (1 - task['rho']) * task['d'] + + print("\n") + print(f"任务数量: {len(tasks_sorted)}") + print(f"车辆总移动时间: {total_car_time:.2f} 秒") + print(f"无人机总飞行时间: {total_flight_time:.2f} 秒") + print(f"能耗统计:") + print(f" - 飞行能耗: {total_flight_energy:.2f} 分钟") + print(f" - 计算能耗: {total_comp_energy:.2f} 分钟") + print(f" - 传输能耗: {total_trans_energy:.2f} 分钟") + print( + f" - 总能耗: {(total_flight_energy + total_comp_energy + total_trans_energy):.2f} 分钟") + + print("\n各任务详细信息:") + for i, task in enumerate(tasks_sorted, 1): + print(f"\n任务{i}:") + print( + f" 位置: ({task['center'][0]:.1f}, {task['center'][1]:.1f})") + print(f" 照片数量: {task['d']}") + print(f" 卸载比率(ρ): {task['rho']:.2f}") + print(f" 飞行时间: {task['flight_time']:.2f} 秒") + print(f" 计算时间: {task['comp_time']:.2f} 秒") + print(f" 传输时间: {task['trans_time']:.2f} 秒") + print(f" -- 飞行能耗: {task['d'] * flight_energy_factor:.2f} 分钟") + print(f" -- 计算能耗: {task['d'] * comp_energy_factor:.2f} 分钟") + print(f" -- 传输能耗: {task['d'] * trans_energy_factor:.2f} 分钟") + print(f" 基站计算时间: {task['comp_bs_time']:.2f} 秒") + else: + print("该系统没有分配任务") + print("-" * 50) + +if best_solution is not None: + plt.rcParams['font.family'] = ['sans-serif'] + plt.rcParams['font.sans-serif'] = ['SimHei'] + fig, ax = plt.subplots() + ax.set_xlim(0, W) + ax.set_ylim(0, H) + ax.set_title("区域划分与车-机-巢系统覆盖") + ax.set_xlabel("区域宽度") + ax.set_ylabel("区域高度") + + # 定义若干颜色以区分不同系统(系统编号从0开始) + colors = ['red', 'blue', 'green', 'orange', 'purple', 'cyan', 'magenta'] + + # 绘制区域中心 + region_center = (W / 2.0, H / 2.0) # 注意:x对应宽度,y对应高度 + ax.plot(region_center[0], region_center[1], + 'ko', markersize=8, label="区域中心") + + # 绘制每个任务区域(矩形)及在矩形中心标注系统编号与卸载比率 ρ + for system_id, tasks in best_solution['system_tasks'].items(): + # 重新按车辆行驶顺序排序(启发式:以任务中心距离区域中心的距离排序) + tasks_sorted = sorted(tasks, key=lambda task: math.hypot( + (task['c1'] + (task['c2'] - task['c1']) / 2.0) - region_center[0], + (task['r1'] + (task['r2'] - task['r1']) / 2.0) - region_center[1] + )) + + for i, task in enumerate(tasks_sorted, 1): + # 绘制矩形:左下角坐标为 (c1, r1),宽度为 (c2 - c1),高度为 (r2 - r1) + rect = patches.Rectangle((task['c1'], task['r1']), + task['c2'] - task['c1'], + task['r2'] - task['r1'], + linewidth=2, + edgecolor=colors[system_id % len(colors)], + facecolor='none') + ax.add_patch(rect) + # 计算矩形中心 + center_x = task['c1'] + (task['c2'] - task['c1']) / 2.0 + center_y = task['r1'] + (task['r2'] - task['r1']) / 2.0 + # 在矩形中心标注:系统编号、执行顺序和卸载比率 ρ + ax.text(center_x, center_y, f"S{system_id}-{i}\nρ={task['rho']:.2f}", + color=colors[system_id % len(colors)], + ha='center', va='center', fontsize=10, fontweight='bold') + + # 添加图例 + ax.legend() + # 反转 y 轴使得行号从上到下递增(如需,可取消) + ax.invert_yaxis() + plt.show() +else: + print("没有找到满足约束条件的方案,无法进行可视化。") diff --git a/plot_util.py b/plot_util.py new file mode 100644 index 0000000..04fd216 --- /dev/null +++ b/plot_util.py @@ -0,0 +1,93 @@ +import matplotlib.pyplot as plt +import numpy as np + +# from matplotlib import colors as mcolors + +# matplotlib_colors = list(dict(mcolors.BASE_COLORS, **mcolors.CSS4_COLORS).keys()) + +matplotlib_colors = [ + "black", + "red", + "yellow", + "grey", + "brown", + "darkred", + "peru", + "darkorange", + "darkkhaki", + "steelblue", + "blue", + "cyan", + "green", + "navajowhite", + "lightgrey", + "lightcoral", + "mediumblue", + "midnightblue", + "blueviolet", + "violet", + "fuchsia", + "mediumvioletred", + "hotpink", + "crimson", + "lightpink", + "slategray", + "lime", + "springgreen", + "teal", + "beige", + "olive", +] + + +def find_indices(list_to_check, item_to_find): + indices = [] + for idx, value in enumerate(list_to_check): + if np.array_equal(value, item_to_find): + indices.append(idx) + return indices + + +def plot_results(Best_path, iterations, best_record): + # print(find_indices(Best_path, [0, 0])) + + # Best_path = np.vstack([Best_path, Best_path[0]]) + # Best_path = np.vstack([Best_path[0], Best_path]) + # print(Best_path[0], Best_path[-1]) + + if not np.array_equal(Best_path[0], [0, 0]): + Best_path = np.vstack([[0, 0], Best_path]) + if not np.array_equal(Best_path[-1], [0, 0]): + Best_path = np.vstack([Best_path, [0, 0]]) + # print(Best_path) + + found_start_points_indices = find_indices(Best_path, [0, 0]) + result_paths = [] + + for j in range(len(found_start_points_indices) - 1): + from_index = found_start_points_indices[j] + end_index = found_start_points_indices[j + 1] + path = [] + for k in range(from_index, end_index + 1): + path.append(Best_path[k]) + path = np.array(path) + result_paths.append(path) + + # print(Best_path) + # print(result_paths) + + fig, axs = plt.subplots(1, 2, sharex=False, sharey=False) + axs[0].scatter(Best_path[:, 0], Best_path[:, 1]) + + for ix, path in enumerate(result_paths): + axs[0].plot(path[:, 0], path[:, 1], color=matplotlib_colors[ix], alpha=0.8) + # axs[0].plot(Best_path[:, 0], Best_path[:, 1], color="green", alpha=0.1) + + # Draw start point + axs[0].plot([0], [0], marker="*", markersize=20, color="red") + + axs[0].set_title("Searched Best Solution") + + axs[1].plot(iterations, best_record) + axs[1].set_title("Convergence Curve") + plt.show()