HPCC2025/GA/hybrid_solver.py
2025-03-11 15:46:11 +08:00

172 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import math
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
from ga import GA
import plot_util
# 固定随机种子,便于复现
random.seed(42)
# ---------------------------
# 参数设置
# ---------------------------
H = 20 # 区域高度网格点之间的距离为25m单位距离
W = 25 # 区域宽度
k = 1 # 系统数量(车-巢-机系统个数)
num_iterations = 10000 # 蒙特卡洛模拟迭代次数
# 时间系数(单位:秒,每个网格一张照片)
flight_time_factor = 3 # 每张照片对应的飞行时间无人机飞行速度为9.5m/s拍摄照片的时间间隔为3s
comp_uav_factor = 5 # 无人机上每张照片计算时间5s
trans_time_factor = 0.3 # 每张照片传输时间0.3s
car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间2s加了一个放大因子
comp_bs_factor = 5 # 机巢上每张照片计算时间
# 其他参数
flight_energy_factor = 0.05 # 单位:分钟/张
comp_energy_factor = 0.05 # 计算能耗需要重新估计
trans_energy_factor = 0.0025
battery_capacity = 10 # 无人机只进行飞行续航为30分钟
# bs_energy_factor =
# car_energy_factor =
# ---------------------------
# 蒙特卡洛模拟,寻找最佳方案
# ---------------------------
best_T = float('inf')
best_solution = None
for iteration in range(num_iterations):
# 随机生成分区的行分段数与列分段数
R = random.randint(1, 5) # 行分段数
C = random.randint(1, 5) # 列分段数
# 生成随机的行、列分割边界
row_boundaries = sorted(random.sample(range(1, H), R - 1))
row_boundaries = [0] + row_boundaries + [H]
col_boundaries = sorted(random.sample(range(1, W), C - 1))
col_boundaries = [0] + col_boundaries + [W]
# ---------------------------
# 根据分割边界生成所有矩形任务
# ---------------------------
rectangles = []
valid_partition = True # 标记此分区是否满足所有约束
for i in range(len(row_boundaries) - 1):
for j in range(len(col_boundaries) - 1):
r1 = row_boundaries[i]
r2 = row_boundaries[i + 1]
c1 = col_boundaries[j]
c2 = col_boundaries[j + 1]
d = (r2 - r1) * (c2 - c1) # 任务的照片数量(矩形面积)
# 求解rho
rho_time_limit = (flight_time_factor - trans_time_factor) / \
(comp_uav_factor - trans_time_factor)
rho_energy_limit = (battery_capacity - flight_energy_factor * d - trans_energy_factor * d) / \
(comp_energy_factor * d - trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
# print(rho)
flight_time = flight_time_factor * d
comp_time = comp_uav_factor * rho * d
trans_time = trans_time_factor * (1 - rho) * d
comp_bs_time = comp_bs_factor * (1 - rho) * d
# 计算任务矩形中心,用于后续车辆移动时间计算
center_r = (r1 + r2) / 2.0
center_c = (c1 + c2) / 2.0
rectangles.append({
'r1': r1, 'r2': r2, 'c1': c1, 'c2': c2,
'd': d,
'rho': rho,
'flight_time': flight_time,
'comp_time': comp_time,
'trans_time': trans_time,
'comp_bs_time': comp_bs_time,
'center': [center_r, center_c]
})
if not valid_partition:
break
# 如果有中存在任务不满足电池约束,则跳过这种分区
if not valid_partition:
continue
# ---------------------------
# 使用遗传算法,寻找最佳方案
# ---------------------------
num_city = len(rectangles) + 1 # 划分好的区域中心点+整个区域的中心
epochs = 3000
# 初始化坐标 (第一个点是整个区域的中心)
center_data = [[H / 2.0, W / 2.0]]
for rec in rectangles:
center_data.append(rec['center'])
center_data = np.array(center_data)
# 关键有k架无人机则再增加N-1个`点` (坐标是起始点)这些点之间的距离是inf
for d in range(k - 1):
center_data = np.vstack([center_data, center_data[0]])
num_city += 1 # 增加欺骗城市
to_process_idx = [0]
# print("start point:", location[0])
for d in range(1, k): # 1, ... drone-1
# print("added base point:", location[num_city - d])
to_process_idx.append(num_city - d)
model = GA(num_drones=k, num_city=center_data.shape[0], num_total=20, data=center_data.copy(), to_process_idx=to_process_idx, rectangles=rectangles)
Best_path, Best = model.run()
# 根据最佳路径计算各系统任务分配
if Best_path[0] not in to_process_idx:
Best_path.insert(0, 0)
if Best_path[-1] not in to_process_idx:
Best_path.append(0)
# 计算各系统的任务分配
system_tasks = []
found_start_points_indices = []
for i in range(len(Best_path)):
if Best_path[i] in to_process_idx:
found_start_points_indices.append(i)
for j in range(len(found_start_points_indices) - 1):
from_index = found_start_points_indices[j]
end_index = found_start_points_indices[j + 1]
system_task = []
for k in range(from_index, end_index + 1):
if Best_path[k] not in to_process_idx:
system_task.append(rectangles[Best_path[k] - 1])
system_tasks.append(system_task)
if Best < best_T:
best_T = Best
best_solution = {
'system_tasks': system_tasks,
# 'T_k_list': T_k_list,
'best_T': best_T,
'iteration': iteration,
'R': R,
'C': C,
'row_boundaries': row_boundaries,
'col_boundaries': col_boundaries
}
# ---------------------------
# 输出最佳方案
# ---------------------------
if best_solution is not None:
print("最佳 T (各系统中最长的完成时间):", best_solution['best_T'])
for i in range(k):
num_tasks = len(best_solution['system_tasks'][i])
print(
f"系统 {i}: 飞行任务数量: {num_tasks}")
else:
print("在给定的模拟次数内未找到满足所有约束的方案。")