HPCC2025/mtkl_sovler2.py
2025-04-01 17:46:23 +08:00

192 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import math
import yaml
import json
import numpy as np
from tqdm import tqdm
# 固定随机种子,便于复现
random.seed(42)
# ---------------------------
# 需要修改的超参数
# ---------------------------
num_iterations = 3000000000
# 随机生成分区的行分段数与列分段数
R = random.randint(0, 3) # 行分段数
C = random.randint(0, 3) # 列分段数
# R = 3
# C = 3
params_file = 'params_50_50_3'
with open(params_file + '.yml', 'r', encoding='utf-8') as file:
params = yaml.safe_load(file)
H = params['H']
W = params['W']
k = params['num_cars']
flight_time_factor = params['flight_time_factor']
comp_time_factor = params['comp_time_factor']
trans_time_factor = params['trans_time_factor']
car_time_factor = params['car_time_factor']
bs_time_factor = params['bs_time_factor']
flight_energy_factor = params['flight_energy_factor']
comp_energy_factor = params['comp_energy_factor']
trans_energy_factor = params['trans_energy_factor']
battery_energy_capacity = params['battery_energy_capacity']
# ---------------------------
# 蒙特卡洛模拟,寻找最佳方案
# ---------------------------
best_T = float('inf')
best_solution = None
for iteration in tqdm(range(num_iterations), desc="蒙特卡洛模拟进度"):
# 直接切值
# horiz = [random.random() for _ in range(R)]
horiz = [random.randint(1, 999)/1000 for _ in range(R)]
horiz = sorted(set(horiz))
horiz = horiz if horiz else []
row_boundaries = [0] + horiz + [1]
row_boundaries = [boundary * H for boundary in row_boundaries]
# vert = [random.random() for _ in range(C)]
vert = [random.randint(1, 999)/1000 for _ in range(C)]
vert = sorted(set(vert))
vert = vert if vert else []
col_boundaries = [0] + vert + [1]
col_boundaries = [boundary * W for boundary in col_boundaries]
# ---------------------------
# 根据分割边界生成所有矩形任务
# ---------------------------
rectangles = []
valid_partition = True # 标记此分区是否满足所有约束
for i in range(len(row_boundaries) - 1):
for j in range(len(col_boundaries) - 1):
r1 = row_boundaries[i]
r2 = row_boundaries[i + 1]
c1 = col_boundaries[j]
c2 = col_boundaries[j + 1]
d = (r2 - r1) * (c2 - c1) # 任务的照片数量(矩形面积)
# 求解rho
rho_time_limit = (flight_time_factor - trans_time_factor) / \
(comp_time_factor - trans_time_factor)
rho_energy_limit = (battery_energy_capacity - flight_energy_factor * d - trans_energy_factor * d) / \
(comp_energy_factor * d - trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
flight_time = flight_time_factor * d
comp_time = comp_time_factor * rho * d
trans_time = trans_time_factor * (1 - rho) * d
bs_time = bs_time_factor * (1 - rho) * d
# 计算任务矩形中心,用于后续车辆移动时间计算
center_r = (r1 + r2) / 2.0
center_c = (c1 + c2) / 2.0
rectangles.append({
'r1': r1, 'r2': r2, 'c1': c1, 'c2': c2,
'd': d,
'rho': rho,
'flight_time': flight_time,
'comp_time': comp_time,
'trans_time': trans_time,
'bs_time': bs_time,
'center': (center_r, center_c)
})
if not valid_partition:
break
# 如果分区中存在任务不满足电池约束,则跳过该分区
if not valid_partition:
continue
# ---------------------------
# 随机将所有矩形任务分配给 k 个系统(车-机-巢)
# ---------------------------
car_paths = [[] for _ in range(k)]
for i in range(len(row_boundaries) - 1):
for j in range(len(col_boundaries) - 1):
car_idx = random.randint(0, k - 1)
car_paths[car_idx].append(i * (len(col_boundaries) - 1) + j)
# ---------------------------
# 对于每个系统,计算该系统的总完成时间 T_k
# T_k = 所有任务的飞行时间之和 + 车辆的移动时间
# 车辆移动时间:车辆从区域中心出发,依次经过各任务中心(顺序采用距离区域中心的启发式排序)
# ---------------------------
region_center = (H / 2.0, W / 2.0)
T_k_list = []
for i in range(k):
car_path = car_paths[i]
car_path.sort(key=lambda r: math.dist(
rectangles[r]['center'], region_center))
total_flight_time = sum(
rectangles[point]['flight_time'] for point in car_path)
if car_path:
# 车辆从区域中心到第一个任务中心
car_time = math.dist(rectangles[car_path[0]]['center'],
region_center) * car_time_factor
# 依次经过任务中心
for j in range(len(car_path) - 1):
prev_center = rectangles[car_path[j]]['center']
curr_center = rectangles[car_path[j + 1]]['center']
car_time += math.dist(curr_center,
prev_center) * car_time_factor
# 回到区域中心
car_time += math.dist(region_center, curr_center) * car_time_factor
else:
car_time = 0
# 机巢的计算时间
total_bs_time = sum(rectangles[point]['bs_time'] for point in car_path)
T_k = max(total_flight_time + car_time, total_bs_time)
T_k_list.append(T_k)
T_max = max(T_k_list) # 整体目标 T 为各系统中最大的 T_k
if T_max < best_T:
best_T = T_max
best_solution = {
'car_paths': car_paths,
'T_k_list': T_k_list,
'T_max': T_max,
'iteration': iteration,
'R': R,
'C': C,
'row_boundaries': row_boundaries,
'col_boundaries': col_boundaries,
'car_time': car_time,
'flight_time': total_flight_time,
'bs_time': total_bs_time
}
print(iteration)
# ---------------------------
# 输出最佳方案
# ---------------------------
if best_solution is not None:
print("最佳 T:", best_solution['T_max'])
print("Row boundaries:", best_solution['row_boundaries'])
print("Col boundaries:", best_solution['col_boundaries'])
print("最佳路径:", best_solution['car_paths'])
# 保存分区边界和车辆轨迹到JSON文件
output_data = {
'row_boundaries': [boundary / H for boundary in best_solution['row_boundaries']],
'col_boundaries': [boundary / W for boundary in best_solution['col_boundaries']],
'car_paths': best_solution['car_paths']
}
with open(f'./solutions/mtkl_{params_file}.json', 'w', encoding='utf-8') as f:
json.dump(output_data, f, ensure_ascii=False, indent=4)
else:
print("在给定的模拟次数内未找到满足所有约束的方案。")