使用遗传算法求解多旅行商问题

This commit is contained in:
weixin_46229132 2025-03-09 16:53:01 +08:00
parent 34725a8edf
commit 01c6a71b4f
11 changed files with 1445 additions and 44 deletions

View File

@ -40,54 +40,15 @@ class Agent:
self.batch_size = 64
self.optimizer = optim.Adam(self.eval_net.parameters(), lr=self.learning_rate)
# 离散化动作空间
self.v_cuts_actions = [1, 2, 3, 4, 5] # 垂直切割数选项
self.h_cuts_actions = [1, 2, 3, 4, 5] # 水平切割数选项
# self.rho_actions = [0.0, 0.2, 0.4, 0.6, 0.8, 1.0] # 卸载率选项
def discretize_action(self, q_values):
"""将Q值转换为离散动作"""
action = []
# 分别为三个维度选择动作
idx = 0
# 垂直切割数
v_cuts_q = q_values[idx:idx+len(self.v_cuts_actions)]
v_cuts_idx = torch.argmax(v_cuts_q).item()
action.append(self.v_cuts_actions[v_cuts_idx])
idx += len(self.v_cuts_actions)
# 水平切割数
h_cuts_q = q_values[idx:idx+len(self.h_cuts_actions)]
h_cuts_idx = torch.argmax(h_cuts_q).item()
action.append(self.h_cuts_actions[h_cuts_idx])
idx += len(self.h_cuts_actions)
# # 卸载率
# rho_q = q_values[idx:idx+len(self.rho_actions)]
# rho_idx = torch.argmax(rho_q).item()
# action.append(self.rho_actions[rho_idx])
return np.array(action)
def get_action_dim(self):
"""获取离散化后的动作空间维度"""
return (len(self.v_cuts_actions) +
len(self.h_cuts_actions))
# len(self.rho_actions))
def choose_action(self, state):
if random.random() < self.epsilon:
# 随机选择动作
v_cuts = random.choice(self.v_cuts_actions)
h_cuts = random.choice(self.h_cuts_actions)
# rho = random.choice(self.rho_actions)
return np.array([v_cuts, h_cuts])
return random.randint(0, self.action_dim - 1)
else:
# 根据Q值选择动作
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.eval_net(state)
return self.discretize_action(q_values[0])
return torch.argmax(q_values).item()
def store_transition(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
@ -99,13 +60,13 @@ class Agent:
# 随机采样batch
batch = random.sample(self.memory, self.batch_size)
states = torch.FloatTensor([x[0] for x in batch])
actions = torch.FloatTensor([x[1] for x in batch])
actions = torch.LongTensor([x[1] for x in batch])
rewards = torch.FloatTensor([x[2] for x in batch])
next_states = torch.FloatTensor([x[3] for x in batch])
dones = torch.FloatTensor([x[4] for x in batch])
# 计算当前Q值
current_q_values = self.eval_net(states)
current_q_values = self.eval_net(states).gather(1, actions.unsqueeze(1))
# 计算目标Q值
next_q_values = self.target_net(next_states).detach()
@ -113,7 +74,7 @@ class Agent:
target_q_values = rewards + (1 - dones) * self.gamma * max_next_q
# 计算损失
loss = nn.MSELoss()(current_q_values.mean(), target_q_values.mean())
loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
# 更新网络
self.optimizer.zero_grad()
@ -126,6 +87,8 @@ class Agent:
# 定期更新目标网络
if self.learn.counter % 100 == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())
self.learn.counter += 1
# 添加计数器属性
learn.counter = 0

View File

140
DQN/env_allocation.py Normal file
View File

@ -0,0 +1,140 @@
import numpy as np
import gym
from gym import spaces
class AllocationEnv(gym.Env):
"""任务分配环境(第二层)"""
def __init__(self, subareas, num_systems):
super(AllocationEnv, self).__init__()
self.subareas = subareas # 子区域列表
self.num_systems = num_systems # 系统数量
# 时间系数
self.flight_time_factor = 3 # 每张照片飞行时间
self.comp_uav_factor = 5 # 无人机计算时间
self.trans_time_factor = 0.3 # 传输时间
self.car_move_time_factor = 100 # 汽车移动时间
self.comp_bs_factor = 5 # 机巢计算时间
# 能量参数
self.flight_energy_factor = 0.05 # 飞行能耗
self.comp_energy_factor = 0.05 # 计算能耗
self.trans_energy_factor = 0.0025 # 传输能耗
self.battery_capacity = 30 # 电池容量
# 动作空间:每个子区域分配给哪个系统
self.action_space = spaces.MultiDiscrete([num_systems] * len(subareas))
# 状态空间:[各系统当前负载]
self.observation_space = spaces.Box(
low=np.zeros(num_systems),
high=np.ones(num_systems) * float('inf'),
dtype=np.float32
)
self.state = None
self.current_step = 0
self.max_steps = 1000
def calculate_rho(self, area):
"""计算最优卸载率"""
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
(self.comp_uav_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * area - self.trans_energy_factor * area) / \
(self.comp_energy_factor * area - self.trans_energy_factor * area)
if rho_energy_limit < 0:
return None
return min(rho_time_limit, rho_energy_limit)
def step(self, action):
self.current_step += 1
# 初始化每个系统的任务列表
system_tasks = {i: [] for i in range(self.num_systems)}
# 根据动作分配任务
for i, system_id in enumerate(action):
system_tasks[system_id].append(self.subareas[i])
# 计算每个系统的完成时间
system_times = []
valid_allocation = True
for system_id, tasks in system_tasks.items():
if not tasks: # 如果系统没有分配任务
system_times.append(0)
continue
# 调用第三层(路径规划)获取结果
from env_routing import RoutingEnv
route_env = RoutingEnv(tasks)
completion_time, valid = route_env.optimize()
if not valid:
valid_allocation = False
break
system_times.append(completion_time)
total_time = max(system_times) if system_times else 0
# 计算奖励
if not valid_allocation:
reward = -10000
done = True
else:
reward = -total_time
done = self.current_step >= self.max_steps
# 更新状态(各系统的负载)
self.state = np.array([len(tasks) for tasks in system_tasks.values()])
return self.state, reward, done, {}
def reset(self):
self.state = np.zeros(self.num_systems)
self.current_step = 0
return self.state
def render(self, mode='human'):
pass
def optimize(self):
"""使用DQN优化任务分配"""
from dqn import Agent
state_dim = self.observation_space.shape[0]
action_dim = self.num_systems * len(self.subareas)
agent = Agent(state_dim, action_dim)
# 训练参数
episodes = 100 # 减少训练轮数,因为这是子问题
max_steps = 100
best_reward = float('-inf')
best_time = float('inf')
valid_solution = False
for episode in range(episodes):
state = self.reset()
episode_reward = 0
for step in range(max_steps):
action = agent.choose_action(state)
next_state, reward, done, _ = self.step(action)
agent.store_transition(state, action, reward, next_state, done)
agent.learn()
episode_reward += reward
state = next_state
if done:
if reward != -10000: # 如果是有效解
valid_solution = True
best_time = min(best_time, -reward)
break
return best_time, valid_solution

88
DQN/env_partition.py Normal file
View File

@ -0,0 +1,88 @@
import numpy as np
import gym
from gym import spaces
class PartitionEnv(gym.Env):
"""区域划分环境(第一层)"""
def __init__(self):
super(PartitionEnv, self).__init__()
# 环境参数
self.H = 20 # 区域高度
self.W = 25 # 区域宽度
self.k = 1 # 系统数量
# 动作空间:[垂直切割数, 水平切割数]
self.action_space = spaces.Box(
low=np.array([1, 1]),
high=np.array([5, 5]),
dtype=np.float32
)
# 状态空间:[当前垂直切割数, 当前水平切割数, 当前最大完成时间]
self.observation_space = spaces.Box(
low=np.array([1, 1, 0]),
high=np.array([5, 5, float('inf')]),
dtype=np.float32
)
self.state = None
self.current_step = 0
self.max_steps = 1000
def generate_subareas(self, v_cuts, h_cuts):
"""生成子区域信息"""
v_boundaries = np.linspace(0, self.H, v_cuts + 1)
h_boundaries = np.linspace(0, self.W, h_cuts + 1)
subareas = []
for i in range(len(v_boundaries) - 1):
for j in range(len(h_boundaries) - 1):
height = v_boundaries[i+1] - v_boundaries[i]
width = h_boundaries[j+1] - h_boundaries[j]
center_y = (v_boundaries[i] + v_boundaries[i+1]) / 2
center_x = (h_boundaries[j] + h_boundaries[j+1]) / 2
subareas.append({
'height': height,
'width': width,
'area': height * width,
'center': (center_y, center_x)
})
return subareas
def step(self, action):
self.current_step += 1
# 解析动作
v_cuts = int(action[0]) # 垂直切割数
h_cuts = int(action[1]) # 水平切割数
# 生成子区域
subareas = self.generate_subareas(v_cuts, h_cuts)
# 调用第二层(任务分配)获取结果
from env_allocation import AllocationEnv
alloc_env = AllocationEnv(subareas, self.k)
total_time, valid = alloc_env.optimize()
# 计算奖励
if not valid:
reward = -10000 # 惩罚无效方案
done = True
else:
reward = -total_time # 负的完成时间作为奖励
done = self.current_step >= self.max_steps
# 更新状态
self.state = np.array([v_cuts, h_cuts, total_time])
return self.state, reward, done, {}
def reset(self):
self.state = np.array([1, 1, 0])
self.current_step = 0
return self.state
def render(self, mode='human'):
pass

152
DQN/env_routing.py Normal file
View File

@ -0,0 +1,152 @@
import numpy as np
import gym
from gym import spaces
class RoutingEnv(gym.Env):
"""路径规划环境(第三层)"""
def __init__(self, tasks):
super(RoutingEnv, self).__init__()
self.tasks = tasks # 任务列表
self.H = 20 # 区域高度
self.W = 25 # 区域宽度
self.region_center = (self.H/2, self.W/2)
# 时间系数
self.flight_time_factor = 3 # 每张照片飞行时间
self.comp_uav_factor = 5 # 无人机计算时间
self.trans_time_factor = 0.3 # 传输时间
self.car_move_time_factor = 100 # 汽车移动时间
self.comp_bs_factor = 5 # 机巢计算时间
# 动作空间:选择下一个要访问的任务索引
self.action_space = spaces.Discrete(len(tasks))
# 状态空间:[当前位置x, 当前位置y, 未访问任务的mask]
self.observation_space = spaces.Box(
low=np.array([0, 0] + [0] * len(tasks)),
high=np.array([self.H, self.W] + [1] * len(tasks)),
dtype=np.float32
)
self.state = None
self.current_position = self.region_center
self.unvisited_mask = np.ones(len(tasks))
self.total_flight_time = 0
def calculate_task_time(self, task):
"""计算单个任务的执行时间"""
area = task['area']
# 计算最优卸载率
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
(self.comp_uav_factor - self.trans_time_factor)
rho_energy_limit = (30 - self.flight_time_factor * area - self.trans_time_factor * area) / \
(self.comp_uav_factor * area - self.trans_time_factor * area)
if rho_energy_limit < 0:
return None, None
rho = min(rho_time_limit, rho_energy_limit)
# 计算各阶段时间
flight_time = self.flight_time_factor * area
comp_time = self.comp_uav_factor * rho * area
trans_time = self.trans_time_factor * (1 - rho) * area
comp_bs_time = self.comp_bs_factor * (1 - rho) * area
task_time = max(flight_time, comp_bs_time)
return task_time, rho
def calculate_move_time(self, from_pos, to_pos):
"""计算移动时间"""
dist = np.sqrt((from_pos[0] - to_pos[0])**2 + (from_pos[1] - to_pos[1])**2)
return dist * self.car_move_time_factor
def step(self, action):
# 检查动作是否有效
if self.unvisited_mask[action] == 0:
return self.state, -10000, True, {} # 惩罚选择已访问的任务
# 获取选中的任务
task = self.tasks[action]
task_center = task['center']
# 计算移动时间
move_time = self.calculate_move_time(self.current_position, task_center)
# 计算任务执行时间
task_time, rho = self.calculate_task_time(task)
if task_time is None: # 任务不可行
return self.state, -10000, True, {}
# 更新状态
self.current_position = task_center
self.unvisited_mask[action] = 0
self.total_flight_time += task_time
# 构建新状态
self.state = np.concatenate([
np.array(self.current_position),
self.unvisited_mask
])
# 检查是否所有任务都已完成
done = np.sum(self.unvisited_mask) == 0
# 计算奖励(负的总时间)
total_time = max(self.total_flight_time, move_time)
reward = -total_time if done else -move_time
return self.state, reward, done, {}
def reset(self):
self.current_position = self.region_center
self.unvisited_mask = np.ones(len(self.tasks))
self.total_flight_time = 0
self.state = np.concatenate([
np.array(self.current_position),
self.unvisited_mask
])
return self.state
def render(self, mode='human'):
pass
def optimize(self):
"""使用DQN优化路径规划"""
from dqn import Agent
state_dim = self.observation_space.shape[0]
action_dim = len(self.tasks)
agent = Agent(state_dim, action_dim)
# 训练参数
episodes = 50 # 进一步减少训练轮数,因为这是最底层子问题
max_steps = len(self.tasks) + 1 # 最多访问所有任务+返回
best_reward = float('-inf')
best_time = float('inf')
valid_solution = False
for episode in range(episodes):
state = self.reset()
episode_reward = 0
for step in range(max_steps):
action = agent.choose_action(state)
next_state, reward, done, _ = self.step(action)
agent.store_transition(state, action, reward, next_state, done)
agent.learn()
episode_reward += reward
state = next_state
if done:
if reward != -10000: # 如果是有效解
valid_solution = True
best_time = min(best_time, -reward)
break
return best_time, valid_solution

118
DQN/run_hierarchical.py Normal file
View File

@ -0,0 +1,118 @@
from env_partition import PartitionEnv
from env_allocation import AllocationEnv
from env_routing import RoutingEnv
from dqn import Agent
import numpy as np
import matplotlib.pyplot as plt
def train_hierarchical():
"""训练分层强化学习系统"""
# 创建第一层环境(区域划分)
partition_env = PartitionEnv()
partition_state_dim = partition_env.observation_space.shape[0]
partition_action_dim = 10 # 5个垂直切割选项 + 5个水平切割选项
partition_agent = Agent(partition_state_dim, partition_action_dim)
# 训练参数
episodes = 1000
max_steps = 1000
# 记录训练过程
rewards_history = []
best_reward = float('-inf')
best_solution = None
# 开始训练
print("开始训练分层强化学习系统...")
for episode in range(episodes):
state = partition_env.reset()
episode_reward = 0
for step in range(max_steps):
# 选择动作
action = partition_agent.choose_action(state)
# 执行动作(这会触发第二层和第三层的优化)
next_state, reward, done, _ = partition_env.step(action)
# 存储经验
partition_agent.store_transition(state, action, reward, next_state, done)
# 学习
partition_agent.learn()
episode_reward += reward
state = next_state
if done:
break
# 记录每个episode的总奖励
rewards_history.append(episode_reward)
# 更新最佳解
if episode_reward > best_reward:
best_reward = episode_reward
best_solution = {
'vertical_cuts': int(action[0]),
'horizontal_cuts': int(action[1]),
'total_time': -reward if reward != -10000 else float('inf'),
'episode': episode
}
# 打印训练进度
if (episode + 1) % 10 == 0:
avg_reward = np.mean(rewards_history[-10:])
print(f"Episode {episode + 1}, Average Reward: {avg_reward:.2f}")
return best_solution, rewards_history
def plot_training_results(rewards_history):
plt.figure(figsize=(10, 5))
plt.plot(rewards_history)
plt.title('Hierarchical DQN Training Progress')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True)
plt.show()
def print_solution(solution):
print("\n最佳解决方案:")
print(f"在第 {solution['episode']} 轮找到")
print(f"垂直切割数: {solution['vertical_cuts']}")
print(f"水平切割数: {solution['horizontal_cuts']}")
print(f"总完成时间: {solution['total_time']:.2f}")
def visualize_partition(solution):
"""可视化区域划分结果"""
H, W = 20, 25
v_cuts = solution['vertical_cuts']
h_cuts = solution['horizontal_cuts']
plt.figure(figsize=(10, 8))
# 绘制网格
for i in range(v_cuts + 1):
y = i * (H / v_cuts)
plt.axhline(y=y, color='b', linestyle='-', alpha=0.5)
for i in range(h_cuts + 1):
x = i * (W / h_cuts)
plt.axvline(x=x, color='b', linestyle='-', alpha=0.5)
plt.title('Area Partition Visualization')
plt.xlabel('Width')
plt.ylabel('Height')
plt.grid(True, alpha=0.3)
plt.show()
if __name__ == "__main__":
# 训练模型
best_solution, rewards_history = train_hierarchical()
# 显示结果
plot_training_results(rewards_history)
print_solution(best_solution)
visualize_partition(best_solution)

369
ga.py Normal file
View File

@ -0,0 +1,369 @@
import math
import random
import matplotlib.pyplot as plt
import numpy as np
import plot_util
class GA(object):
def __init__(self, num_drones, num_city, num_total, data, to_process_idx, rectangles):
self.num_drones = num_drones
self.num_city = num_city
self.num_total = num_total
self.scores = []
# self.iteration = iteration
self.location = data
self.to_process_idx = to_process_idx
self.rectangles = rectangles
self.epochs = 3000
self.ga_choose_ratio = 0.2
self.mutate_ratio = 0.05
# fruits中存每一个个体是下标的list
self.dis_mat = self.compute_dis_mat(num_city, data)
# self.fruits = self.greedy_init(self.dis_mat, num_total, num_city)
self.fruits = self.random_init(num_total, num_city)
# 显示初始化后的最佳路径
scores = self.compute_adp(self.fruits)
sort_index = np.argsort(-scores)
init_best = self.fruits[sort_index[0]]
init_best = self.location[init_best]
# 存储每个iteration的结果画出收敛图
self.iter_x = [0]
self.iter_y = [1.0 / scores[sort_index[0]]]
def random_init(self, num_total, num_city):
tmp = [x for x in range(num_city)]
result = []
for i in range(num_total):
random.shuffle(tmp)
result.append(tmp.copy())
# print("Lens:", len(result), len(result[0]))
return result
def greedy_init(self, dis_mat, num_total, num_city):
start_index = 0
result = []
for i in range(num_total):
rest = [x for x in range(0, num_city)]
# 所有起始点都已经生成了
if start_index >= num_city:
start_index = np.random.randint(0, num_city)
result.append(result[start_index].copy())
continue
current = start_index
rest.remove(current)
# 找到一条最近邻路径
result_one = [current]
while len(rest) != 0:
tmp_min = math.inf
tmp_choose = -1
for x in rest:
# print("---", current, x, dis_mat[current][x])
if dis_mat[current][x] < tmp_min:
tmp_min = dis_mat[current][x]
tmp_choose = x
if tmp_choose == -1: # 此种情况仅可能发生在剩的都是基地点
tmp_choose = rest[0]
# print("tmp_choose:", tmp_choose)
current = tmp_choose
result_one.append(tmp_choose)
# print(current, rest)
rest.remove(tmp_choose)
# print(rest)
result.append(result_one)
start_index += 1
# print(len(result), len(result[0]))
return result
# 计算不同城市之间的距离
def compute_dis_mat(self, num_city, location):
dis_mat = np.zeros((num_city, num_city))
for i in range(num_city):
for j in range(num_city):
if i == j:
dis_mat[i][j] = np.inf
continue
a = location[i]
b = location[j]
tmp = np.sqrt(sum([(x[0] - x[1]) ** 2 for x in zip(a, b)]))
dis_mat[i][j] = tmp
for i in self.to_process_idx:
for j in self.to_process_idx:
# print("processing:", i, j, dis_mat[i][j])
dis_mat[i][j] = np.inf
return dis_mat
# 计算路径长度
def compute_pathlen(self, tmp_path, dis_mat):
path = tmp_path.copy()
if path[0] not in self.to_process_idx:
path.insert(0, 0)
if path[-1] not in self.to_process_idx:
path.append(0)
try:
a = path[0]
b = path[-1]
except:
import pdb
pdb.set_trace()
car_infos = []
# 计算各系统的任务分配
found_start_points_indices = []
for i in range(len(path)):
if path[i] in self.to_process_idx:
found_start_points_indices.append(i)
car_paths = []
for j in range(len(found_start_points_indices) - 1):
from_index = found_start_points_indices[j]
end_index = found_start_points_indices[j + 1]
car_path = []
for k in range(from_index, end_index + 1):
car_path.append(path[k])
car_paths.append(car_path)
# 计算各系统的车辆移动距离
for car_path in car_paths:
car_time = 0
for i in range(len(car_path) - 1):
a = car_path[i]
b = car_path[i + 1]
if a in self.to_process_idx and b in self.to_process_idx:
car_time += 0
else:
car_time += dis_mat[a][b] * 100 # TODO 这里要换成对应参数
car_move_info = {'car_path': car_path, 'car_time': car_time}
car_infos.append(car_move_info)
sorted_car_infos = sorted(car_infos, key=lambda x: x['car_time'])
# 处理任务分配为num_drones + 1的情况合并列表前两个元素
if len(car_paths) == self.num_drones + 1:
first = sorted_car_infos[0]
second = sorted_car_infos[1]
merged_path = first['car_path'] + second['car_path']
merged_time = first['car_time'] + second['car_time']
merged_dict = {'car_path': merged_path, 'car_time': merged_time}
sorted_car_infos = [merged_dict] + sorted_car_infos[3:]
# 计算各系统的总时间max(飞行时间+车的时间, 机巢计算时间)
T_k_list = []
for car_info in sorted_car_infos:
flight_time = 0
bs_time = 0
for point in car_info['car_path']:
if point in self.to_process_idx:
continue
else:
flight_time += self.rectangles[point - 1]['flight_time'] # 注意,这里要减一!!!
bs_time += self.rectangles[point - 1]['comp_bs_time']
system_time = max(flight_time + car_info['car_time'], bs_time)
T_k_list.append(system_time)
T_max = max(T_k_list)
return T_max
# 计算种群适应度
def compute_adp(self, fruits):
adp = []
for fruit in fruits:
if isinstance(fruit, int):
import pdb
pdb.set_trace()
length = self.compute_pathlen(fruit, self.dis_mat)
adp.append(1.0 / length)
return np.array(adp)
def swap_part(self, list1, list2):
index = len(list1)
list = list1 + list2
list = list[::-1]
return list[:index], list[index:]
def ga_cross(self, x, y):
len_ = len(x)
assert len(x) == len(y)
path_list = [t for t in range(len_)]
order = list(random.sample(path_list, 2))
order.sort()
start, end = order
# 找到冲突点并存下他们的下标,x中存储的是y中的下标,y中存储x与它冲突的下标
tmp = x[start:end]
x_conflict_index = []
for sub in tmp:
index = y.index(sub)
if not (index >= start and index < end):
x_conflict_index.append(index)
y_confict_index = []
tmp = y[start:end]
for sub in tmp:
index = x.index(sub)
if not (index >= start and index < end):
y_confict_index.append(index)
assert len(x_conflict_index) == len(y_confict_index)
# 交叉
tmp = x[start:end].copy()
x[start:end] = y[start:end]
y[start:end] = tmp
# 解决冲突
for index in range(len(x_conflict_index)):
i = x_conflict_index[index]
j = y_confict_index[index]
y[i], x[j] = x[j], y[i]
assert len(set(x)) == len_ and len(set(y)) == len_
return list(x), list(y)
def ga_parent(self, scores, ga_choose_ratio):
sort_index = np.argsort(-scores).copy()
sort_index = sort_index[0 : int(ga_choose_ratio * len(sort_index))]
parents = []
parents_score = []
for index in sort_index:
parents.append(self.fruits[index])
parents_score.append(scores[index])
return parents, parents_score
def ga_choose(self, genes_score, genes_choose):
sum_score = sum(genes_score)
score_ratio = [sub * 1.0 / sum_score for sub in genes_score]
rand1 = np.random.rand()
rand2 = np.random.rand()
index1, index2 = 0, 0
for i, sub in enumerate(score_ratio):
if rand1 >= 0:
rand1 -= sub
if rand1 < 0:
index1 = i
if rand2 >= 0:
rand2 -= sub
if rand2 < 0:
index2 = i
if rand1 < 0 and rand2 < 0:
break
return list(genes_choose[index1]), list(genes_choose[index2])
def ga_mutate(self, gene):
path_list = [t for t in range(len(gene))]
order = list(random.sample(path_list, 2))
start, end = min(order), max(order)
tmp = gene[start:end]
# np.random.shuffle(tmp)
tmp = tmp[::-1]
gene[start:end] = tmp
return list(gene)
def ga(self):
# 获得优质父代
scores = self.compute_adp(self.fruits)
# 选择部分优秀个体作为父代候选集合
parents, parents_score = self.ga_parent(scores, self.ga_choose_ratio)
tmp_best_one = parents[0]
tmp_best_score = parents_score[0]
# 新的种群fruits
fruits = parents.copy()
# 生成新的种群
while len(fruits) < self.num_total:
# 轮盘赌方式对父代进行选择
gene_x, gene_y = self.ga_choose(parents_score, parents)
# 交叉
gene_x_new, gene_y_new = self.ga_cross(gene_x, gene_y)
# 变异
if np.random.rand() < self.mutate_ratio:
gene_x_new = self.ga_mutate(gene_x_new)
if np.random.rand() < self.mutate_ratio:
gene_y_new = self.ga_mutate(gene_y_new)
x_adp = 1.0 / self.compute_pathlen(gene_x_new, self.dis_mat)
y_adp = 1.0 / self.compute_pathlen(gene_y_new, self.dis_mat)
# 将适应度高的放入种群中
if x_adp > y_adp and (not gene_x_new in fruits):
fruits.append(gene_x_new)
elif x_adp <= y_adp and (not gene_y_new in fruits):
fruits.append(gene_y_new)
self.fruits = fruits
return tmp_best_one, tmp_best_score
def run(self):
BEST_LIST = None
best_score = -math.inf
self.best_record = []
early_stop_cnt = 0
for i in range(self.epochs):
tmp_best_one, tmp_best_score = self.ga()
self.iter_x.append(i)
self.iter_y.append(1.0 / tmp_best_score)
if tmp_best_score > best_score:
best_score = tmp_best_score
BEST_LIST = tmp_best_one
early_stop_cnt = 0
else:
early_stop_cnt += 1
if early_stop_cnt == 50: # 若连续50次没有性能提升则早停
break
self.best_record.append(1.0 / best_score)
best_length = 1.0 / best_score
# print(f"Epoch {i:3}: {best_length:.3f}")
# print(1.0 / best_score)
return tmp_best_one, 1.0 / best_score
if __name__ == '__main__':
seed = 42
num_drones = 6
num_city = 12
epochs = 3000
# 固定随机数
np.random.seed(seed)
random.seed(seed)
## 初始化坐标 (第一个点是基地的起点,起点的坐标是 0,0 )
data = [[0, 0]]
for i in range(num_city - 1):
while True:
x = np.random.randint(-250, 250)
y = np.random.randint(-250, 250)
if x != 0 or y != 0:
break
data.append([x, y])
data = np.array(data)
# 关键有N架无人机则再增加N-1个`点` (坐标是起始点)这些点之间的距离是inf
for d in range(num_drones - 1):
data = np.vstack([data, data[0]])
num_city += 1 # 增加欺骗城市
to_process_idx = [0]
# print("start point:", location[0])
for d in range(1, num_drones): # 1, ... drone-1
# print("added base point:", location[num_city - d])
to_process_idx.append(num_city - d)
model = GA(num_city=data.shape[0], num_total=20, data=data.copy())
Best_path, Best = model.run()
print(Best_path)
iterations = model.iter_x
best_record = model.iter_y
# print(Best_path)
print(f"Best Path Length: {Best:.3f}")
plot_util.plot_results(Best_path, iterations, best_record)

172
hybrid_solver.py Normal file
View File

@ -0,0 +1,172 @@
import random
import math
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
from ga import GA
import plot_util
# 固定随机种子,便于复现
random.seed(42)
# ---------------------------
# 参数设置
# ---------------------------
H = 20 # 区域高度网格点之间的距离为25m单位距离
W = 25 # 区域宽度
k = 1 # 系统数量(车-巢-机系统个数)
num_iterations = 10000 # 蒙特卡洛模拟迭代次数
# 时间系数(单位:秒,每个网格一张照片)
flight_time_factor = 3 # 每张照片对应的飞行时间无人机飞行速度为9.5m/s拍摄照片的时间间隔为3s
comp_uav_factor = 5 # 无人机上每张照片计算时间5s
trans_time_factor = 0.3 # 每张照片传输时间0.3s
car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间2s加了一个放大因子
comp_bs_factor = 5 # 机巢上每张照片计算时间
# 其他参数
flight_energy_factor = 0.05 # 单位:分钟/张
comp_energy_factor = 0.05 # 计算能耗需要重新估计
trans_energy_factor = 0.0025
battery_capacity = 10 # 无人机只进行飞行续航为30分钟
# bs_energy_factor =
# car_energy_factor =
# ---------------------------
# 蒙特卡洛模拟,寻找最佳方案
# ---------------------------
best_T = float('inf')
best_solution = None
for iteration in range(num_iterations):
# 随机生成分区的行分段数与列分段数
R = random.randint(1, 5) # 行分段数
C = random.randint(1, 5) # 列分段数
# 生成随机的行、列分割边界
row_boundaries = sorted(random.sample(range(1, H), R - 1))
row_boundaries = [0] + row_boundaries + [H]
col_boundaries = sorted(random.sample(range(1, W), C - 1))
col_boundaries = [0] + col_boundaries + [W]
# ---------------------------
# 根据分割边界生成所有矩形任务
# ---------------------------
rectangles = []
valid_partition = True # 标记此分区是否满足所有约束
for i in range(len(row_boundaries) - 1):
for j in range(len(col_boundaries) - 1):
r1 = row_boundaries[i]
r2 = row_boundaries[i + 1]
c1 = col_boundaries[j]
c2 = col_boundaries[j + 1]
d = (r2 - r1) * (c2 - c1) # 任务的照片数量(矩形面积)
# 求解rho
rho_time_limit = (flight_time_factor - trans_time_factor) / \
(comp_uav_factor - trans_time_factor)
rho_energy_limit = (battery_capacity - flight_energy_factor * d - trans_energy_factor * d) / \
(comp_energy_factor * d - trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
# print(rho)
flight_time = flight_time_factor * d
comp_time = comp_uav_factor * rho * d
trans_time = trans_time_factor * (1 - rho) * d
comp_bs_time = comp_bs_factor * (1 - rho) * d
# 计算任务矩形中心,用于后续车辆移动时间计算
center_r = (r1 + r2) / 2.0
center_c = (c1 + c2) / 2.0
rectangles.append({
'r1': r1, 'r2': r2, 'c1': c1, 'c2': c2,
'd': d,
'rho': rho,
'flight_time': flight_time,
'comp_time': comp_time,
'trans_time': trans_time,
'comp_bs_time': comp_bs_time,
'center': [center_r, center_c]
})
if not valid_partition:
break
# 如果有中存在任务不满足电池约束,则跳过这种分区
if not valid_partition:
continue
# ---------------------------
# 使用遗传算法,寻找最佳方案
# ---------------------------
num_city = len(rectangles) + 1 # 划分好的区域中心点+整个区域的中心
epochs = 3000
# 初始化坐标 (第一个点是整个区域的中心)
center_data = [[H / 2.0, W / 2.0]]
for rec in rectangles:
center_data.append(rec['center'])
center_data = np.array(center_data)
# 关键有k架无人机则再增加N-1个`点` (坐标是起始点)这些点之间的距离是inf
for d in range(k - 1):
center_data = np.vstack([center_data, center_data[0]])
num_city += 1 # 增加欺骗城市
to_process_idx = [0]
# print("start point:", location[0])
for d in range(1, k): # 1, ... drone-1
# print("added base point:", location[num_city - d])
to_process_idx.append(num_city - d)
model = GA(num_drones=k, num_city=center_data.shape[0], num_total=20, data=center_data.copy(), to_process_idx=to_process_idx, rectangles=rectangles)
Best_path, Best = model.run()
# 根据最佳路径计算各系统任务分配
if Best_path[0] not in to_process_idx:
Best_path.insert(0, 0)
if Best_path[-1] not in to_process_idx:
Best_path.append(0)
# 计算各系统的任务分配
system_tasks = []
found_start_points_indices = []
for i in range(len(Best_path)):
if Best_path[i] in to_process_idx:
found_start_points_indices.append(i)
for j in range(len(found_start_points_indices) - 1):
from_index = found_start_points_indices[j]
end_index = found_start_points_indices[j + 1]
system_task = []
for k in range(from_index, end_index + 1):
if Best_path[k] not in to_process_idx:
system_task.append(rectangles[Best_path[k] - 1])
system_tasks.append(system_task)
if Best < best_T:
best_T = Best
best_solution = {
'system_tasks': system_tasks,
# 'T_k_list': T_k_list,
'best_T': best_T,
'iteration': iteration,
'R': R,
'C': C,
'row_boundaries': row_boundaries,
'col_boundaries': col_boundaries
}
# ---------------------------
# 输出最佳方案
# ---------------------------
if best_solution is not None:
print("最佳 T (各系统中最长的完成时间):", best_solution['best_T'])
for i in range(k):
num_tasks = len(best_solution['system_tasks'][i])
print(
f"系统 {i}: 飞行任务数量: {num_tasks}")
else:
print("在给定的模拟次数内未找到满足所有约束的方案。")

306
mtkl_sovler.py Normal file
View File

@ -0,0 +1,306 @@
import random
import math
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# 固定随机种子,便于复现
random.seed(42)
# ---------------------------
# 参数设置
# ---------------------------
H = 20 # 区域高度网格点之间的距离为25m单位距离
W = 25 # 区域宽度
k = 1 # 系统数量(车-巢-机系统个数)
num_iterations = 1000000 # 蒙特卡洛模拟迭代次数
# 时间系数(单位:秒,每个网格一张照片)
flight_time_factor = 3 # 每张照片对应的飞行时间无人机飞行速度为9.5m/s拍摄照片的时间间隔为3s
comp_uav_factor = 5 # 无人机上每张照片计算时间5s
trans_time_factor = 0.3 # 每张照片传输时间0.3s
car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间2s加了一个放大因子
comp_bs_factor = 5 # 机巢上每张照片计算时间
# 其他参数
flight_energy_factor = 0.05 # 单位:分钟/张
comp_energy_factor = 0.05 # 计算能耗需要重新估计
trans_energy_factor = 0.0025
battery_capacity = 10 # 无人机只进行飞行续航为30分钟
# bs_energy_factor =
# car_energy_factor =
# ---------------------------
# 蒙特卡洛模拟,寻找最佳方案
# ---------------------------
best_T = float('inf')
best_solution = None
for iteration in range(num_iterations):
# 随机生成分区的行分段数与列分段数
R = random.randint(1, 5) # 行分段数
C = random.randint(1, 5) # 列分段数
# 生成随机的行、列分割边界
row_boundaries = sorted(random.sample(range(1, H), R - 1))
row_boundaries = [0] + row_boundaries + [H]
col_boundaries = sorted(random.sample(range(1, W), C - 1))
col_boundaries = [0] + col_boundaries + [W]
# ---------------------------
# 根据分割边界生成所有矩形任务
# ---------------------------
rectangles = []
valid_partition = True # 标记此分区是否满足所有约束
for i in range(len(row_boundaries) - 1):
for j in range(len(col_boundaries) - 1):
r1 = row_boundaries[i]
r2 = row_boundaries[i + 1]
c1 = col_boundaries[j]
c2 = col_boundaries[j + 1]
d = (r2 - r1) * (c2 - c1) # 任务的照片数量(矩形面积)
# # 每个任务随机生成卸载比率 ρ ∈ [0,1]
# rho = random.random()
# # rho = 0.1
# # 计算各个阶段时间
# flight_time = flight_time_factor * d
# comp_time = comp_uav_factor * rho * d
# trans_time = trans_time_factor * (1 - rho) * d
# comp_bs_time = comp_bs_factor * (1 - rho) * d
# # 检查无人机电池约束:
# # 飞行+计算+传输能耗需不超过电池容量
# flight_energy = flight_energy_factor * d
# comp_energy = comp_energy_factor * rho * d
# trans_energy = trans_energy_factor * (1 - rho) * d
# total_uav_energy = flight_energy + comp_energy + trans_energy
# # 无人机计算与传输时间不超过飞行时间
# if (total_uav_energy > battery_capacity) or (comp_time + trans_time > flight_time):
# # TODO 时间约束的rho上界是个常数0.57如果区域划分定了rho直接取上界即可可以数学证明
# valid_partition = False
# break
# 求解rho
rho_time_limit = (flight_time_factor - trans_time_factor) / \
(comp_uav_factor - trans_time_factor)
rho_energy_limit = (battery_capacity - flight_energy_factor * d - trans_energy_factor * d) / \
(comp_energy_factor * d - trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
print(rho)
flight_time = flight_time_factor * d
comp_time = comp_uav_factor * rho * d
trans_time = trans_time_factor * (1 - rho) * d
comp_bs_time = comp_bs_factor * (1 - rho) * d
# 计算任务矩形中心,用于后续车辆移动时间计算
center_r = (r1 + r2) / 2.0
center_c = (c1 + c2) / 2.0
rectangles.append({
'r1': r1, 'r2': r2, 'c1': c1, 'c2': c2,
'd': d,
'rho': rho,
'flight_time': flight_time,
'comp_time': comp_time,
'trans_time': trans_time,
'comp_bs_time': comp_bs_time,
'center': (center_r, center_c)
})
if not valid_partition:
break
# 如果分区中存在任务不满足电池约束,则跳过该分区
if not valid_partition:
continue
# ---------------------------
# 随机将所有矩形任务分配给 k 个系统(车-机-巢)
# ---------------------------
system_tasks = {i: [] for i in range(k)}
for rect in rectangles:
system = random.randint(0, k - 1)
system_tasks[system].append(rect)
# ---------------------------
# 对于每个系统,计算该系统的总完成时间 T_k
# T_k = 所有任务的飞行时间之和 + 车辆的移动时间
# 车辆移动时间:车辆从区域中心出发,依次经过各任务中心(顺序采用距离区域中心的启发式排序)
# ---------------------------
region_center = (H / 2.0, W / 2.0)
T_k_list = []
for i in range(k):
tasks = system_tasks[i]
tasks.sort(key=lambda r: math.hypot(r['center'][0] - region_center[0],
r['center'][1] - region_center[1]))
total_flight_time = sum(task['flight_time'] for task in tasks)
if tasks:
# 车辆从区域中心到第一个任务中心
car_time = math.hypot(tasks[0]['center'][0] - region_center[0],
tasks[0]['center'][1] - region_center[1]) * car_move_time_factor
# 依次经过任务中心
for j in range(1, len(tasks)):
prev_center = tasks[j - 1]['center']
curr_center = tasks[j]['center']
car_time += math.hypot(curr_center[0] - prev_center[0],
curr_center[1] - prev_center[1]) * car_move_time_factor
else:
car_time = 0
# 机巢的计算时间
total_bs_time = sum(task['comp_bs_time'] for task in tasks)
T_k = max(total_flight_time + car_time, total_bs_time)
T_k_list.append(T_k)
T_max = max(T_k_list) # 整体目标 T 为各系统中最大的 T_k
# TODO 没有限制系统的总能耗
if T_max < best_T:
best_T = T_max
best_solution = {
'system_tasks': system_tasks,
'T_k_list': T_k_list,
'T_max': T_max,
'iteration': iteration,
'R': R,
'C': C,
'row_boundaries': row_boundaries,
'col_boundaries': col_boundaries
}
# ---------------------------
# 输出最佳方案
# ---------------------------
if best_solution is not None:
print("最佳 T (各系统中最长的完成时间):", best_solution['T_max'])
for i in range(k):
num_tasks = len(best_solution['system_tasks'][i])
print(
f"系统 {i}: 完成时间 T = {best_solution['T_k_list'][i]}, 飞行任务数量: {num_tasks}")
else:
print("在给定的模拟次数内未找到满足所有约束的方案。")
# 在输出最佳方案后添加详细信息
if best_solution is not None:
print("\n各系统详细信息:")
region_center = (H / 2.0, W / 2.0)
for system_id, tasks in best_solution['system_tasks'].items():
print(f"\n系统 {system_id} 的任务详情:")
# 按距离区域中心的距离排序任务
tasks_sorted = sorted(tasks, key=lambda r: math.hypot(r['center'][0] - region_center[0],
r['center'][1] - region_center[1]))
if tasks_sorted:
print(
f"轨迹路线: 区域中心({region_center[0]:.1f}, {region_center[1]:.1f})", end="")
current_pos = region_center
total_car_time = 0
total_flight_time = 0
total_flight_energy = 0
total_comp_energy = 0
total_trans_energy = 0
for i, task in enumerate(tasks_sorted, 1):
# 计算车辆移动时间
car_time = math.hypot(task['center'][0] - current_pos[0],
task['center'][1] - current_pos[1]) * car_move_time_factor
total_car_time += car_time
# 更新当前位置
current_pos = task['center']
print(
f" -> 任务{i}({current_pos[0]:.1f}, {current_pos[1]:.1f})", end="")
# 累加各项数据
total_flight_time += task['flight_time']
total_flight_energy += flight_energy_factor * task['d']
total_comp_energy += comp_energy_factor * \
task['rho'] * task['d']
total_trans_energy += trans_energy_factor * \
(1 - task['rho']) * task['d']
print("\n")
print(f"任务数量: {len(tasks_sorted)}")
print(f"车辆总移动时间: {total_car_time:.2f}")
print(f"无人机总飞行时间: {total_flight_time:.2f}")
print(f"能耗统计:")
print(f" - 飞行能耗: {total_flight_energy:.2f} 分钟")
print(f" - 计算能耗: {total_comp_energy:.2f} 分钟")
print(f" - 传输能耗: {total_trans_energy:.2f} 分钟")
print(
f" - 总能耗: {(total_flight_energy + total_comp_energy + total_trans_energy):.2f} 分钟")
print("\n各任务详细信息:")
for i, task in enumerate(tasks_sorted, 1):
print(f"\n任务{i}:")
print(
f" 位置: ({task['center'][0]:.1f}, {task['center'][1]:.1f})")
print(f" 照片数量: {task['d']}")
print(f" 卸载比率(ρ): {task['rho']:.2f}")
print(f" 飞行时间: {task['flight_time']:.2f}")
print(f" 计算时间: {task['comp_time']:.2f}")
print(f" 传输时间: {task['trans_time']:.2f}")
print(f" -- 飞行能耗: {task['d'] * flight_energy_factor:.2f} 分钟")
print(f" -- 计算能耗: {task['d'] * comp_energy_factor:.2f} 分钟")
print(f" -- 传输能耗: {task['d'] * trans_energy_factor:.2f} 分钟")
print(f" 基站计算时间: {task['comp_bs_time']:.2f}")
else:
print("该系统没有分配任务")
print("-" * 50)
if best_solution is not None:
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei']
fig, ax = plt.subplots()
ax.set_xlim(0, W)
ax.set_ylim(0, H)
ax.set_title("区域划分与车-机-巢系统覆盖")
ax.set_xlabel("区域宽度")
ax.set_ylabel("区域高度")
# 定义若干颜色以区分不同系统系统编号从0开始
colors = ['red', 'blue', 'green', 'orange', 'purple', 'cyan', 'magenta']
# 绘制区域中心
region_center = (W / 2.0, H / 2.0) # 注意x对应宽度y对应高度
ax.plot(region_center[0], region_center[1],
'ko', markersize=8, label="区域中心")
# 绘制每个任务区域(矩形)及在矩形中心标注系统编号与卸载比率 ρ
for system_id, tasks in best_solution['system_tasks'].items():
# 重新按车辆行驶顺序排序(启发式:以任务中心距离区域中心的距离排序)
tasks_sorted = sorted(tasks, key=lambda task: math.hypot(
(task['c1'] + (task['c2'] - task['c1']) / 2.0) - region_center[0],
(task['r1'] + (task['r2'] - task['r1']) / 2.0) - region_center[1]
))
for i, task in enumerate(tasks_sorted, 1):
# 绘制矩形:左下角坐标为 (c1, r1),宽度为 (c2 - c1),高度为 (r2 - r1)
rect = patches.Rectangle((task['c1'], task['r1']),
task['c2'] - task['c1'],
task['r2'] - task['r1'],
linewidth=2,
edgecolor=colors[system_id % len(colors)],
facecolor='none')
ax.add_patch(rect)
# 计算矩形中心
center_x = task['c1'] + (task['c2'] - task['c1']) / 2.0
center_y = task['r1'] + (task['r2'] - task['r1']) / 2.0
# 在矩形中心标注:系统编号、执行顺序和卸载比率 ρ
ax.text(center_x, center_y, f"S{system_id}-{i}\nρ={task['rho']:.2f}",
color=colors[system_id % len(colors)],
ha='center', va='center', fontsize=10, fontweight='bold')
# 添加图例
ax.legend()
# 反转 y 轴使得行号从上到下递增(如需,可取消)
ax.invert_yaxis()
plt.show()
else:
print("没有找到满足约束条件的方案,无法进行可视化。")

93
plot_util.py Normal file
View File

@ -0,0 +1,93 @@
import matplotlib.pyplot as plt
import numpy as np
# from matplotlib import colors as mcolors
# matplotlib_colors = list(dict(mcolors.BASE_COLORS, **mcolors.CSS4_COLORS).keys())
matplotlib_colors = [
"black",
"red",
"yellow",
"grey",
"brown",
"darkred",
"peru",
"darkorange",
"darkkhaki",
"steelblue",
"blue",
"cyan",
"green",
"navajowhite",
"lightgrey",
"lightcoral",
"mediumblue",
"midnightblue",
"blueviolet",
"violet",
"fuchsia",
"mediumvioletred",
"hotpink",
"crimson",
"lightpink",
"slategray",
"lime",
"springgreen",
"teal",
"beige",
"olive",
]
def find_indices(list_to_check, item_to_find):
indices = []
for idx, value in enumerate(list_to_check):
if np.array_equal(value, item_to_find):
indices.append(idx)
return indices
def plot_results(Best_path, iterations, best_record):
# print(find_indices(Best_path, [0, 0]))
# Best_path = np.vstack([Best_path, Best_path[0]])
# Best_path = np.vstack([Best_path[0], Best_path])
# print(Best_path[0], Best_path[-1])
if not np.array_equal(Best_path[0], [0, 0]):
Best_path = np.vstack([[0, 0], Best_path])
if not np.array_equal(Best_path[-1], [0, 0]):
Best_path = np.vstack([Best_path, [0, 0]])
# print(Best_path)
found_start_points_indices = find_indices(Best_path, [0, 0])
result_paths = []
for j in range(len(found_start_points_indices) - 1):
from_index = found_start_points_indices[j]
end_index = found_start_points_indices[j + 1]
path = []
for k in range(from_index, end_index + 1):
path.append(Best_path[k])
path = np.array(path)
result_paths.append(path)
# print(Best_path)
# print(result_paths)
fig, axs = plt.subplots(1, 2, sharex=False, sharey=False)
axs[0].scatter(Best_path[:, 0], Best_path[:, 1])
for ix, path in enumerate(result_paths):
axs[0].plot(path[:, 0], path[:, 1], color=matplotlib_colors[ix], alpha=0.8)
# axs[0].plot(Best_path[:, 0], Best_path[:, 1], color="green", alpha=0.1)
# Draw start point
axs[0].plot([0], [0], marker="*", markersize=20, color="red")
axs[0].set_title("Searched Best Solution")
axs[1].plot(iterations, best_record)
axs[1].set_title("Convergence Curve")
plt.show()