HPCC2025/env.py

291 lines
14 KiB
Python
Raw Normal View History

2025-03-11 16:01:07 +08:00
import gymnasium as gym
from gymnasium import spaces
import numpy as np
2025-03-11 19:43:04 +08:00
import yaml
import math
2025-03-11 16:01:07 +08:00
class PartitionMazeEnv(gym.Env):
"""
自定义环境分为两阶段
阶段 0区域切分 4 每一步输出一个标量用于确定竖切和横切位置
切分顺序为第一步输出 c₁第二步输出 c₂第三步输出 r₁第四步输出 r₂
离散化后取值仅为 {0, 0.1, 0.2, , 0.9}其中 0 表示不切
阶段 1车辆路径规划走迷宫车辆从区域中心出发在九宫格内按照上下左右移动
直到所有目标格子被覆盖或步数上限达到
"""
def __init__(self, config=None):
super(PartitionMazeEnv, self).__init__()
# 车队参数设置
2025-03-11 19:43:04 +08:00
with open('params.yml', 'r', encoding='utf-8') as file:
params = yaml.safe_load(file)
self.H = params['H']
self.W = params['W']
self.num_cars = params['num_cars']
self.flight_time_factor = params['flight_time_factor']
self.comp_time_factor = params['comp_time_factor']
self.trans_time_factor = params['trans_time_factor']
self.car_time_factor = params['car_time_factor']
self.bs_time_factor = params['bs_time_factor']
self.flight_energy_factor = params['flight_energy_factor']
self.comp_energy_factor = params['comp_energy_factor']
self.trans_energy_factor = params['trans_energy_factor']
self.battery_energy_capacity = params['battery_energy_capacity']
2025-03-11 16:01:07 +08:00
2025-03-14 11:17:12 +08:00
##############################
# 可能需要手动修改的超参数
##############################
self.CUT_NUM = 4 # 横切一半,竖切一半
self.BASE_LINE = 3500.0 # 基准时间通过greedy或者蒙特卡洛计算出来
2025-03-11 16:01:07 +08:00
self.phase = 0 # 阶段控制0区域划分阶段1迷宫初始化阶段2走迷宫阶段
self.partition_step = 0 # 区域划分阶段步数,范围 0~4
self.partition_values = np.zeros(
2025-03-14 11:01:02 +08:00
self.CUT_NUM, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂
2025-03-11 16:01:07 +08:00
# 定义动作空间:全部动作均为 1 维连续 [0,1]
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(1,), dtype=np.float32)
# 定义观察空间为8维向量
# TODO 返回的状态目前只有位置坐标
# 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0
# 阶段 1 状态:车辆位置 (2D)
self.observation_space = spaces.Box(
2025-03-14 11:01:02 +08:00
low=0.0, high=1.0, shape=(self.CUT_NUM + 2 * self.num_cars,), dtype=np.float32)
2025-03-11 16:01:07 +08:00
# 切分阶段相关变量
2025-03-13 15:09:58 +08:00
self.col_cuts = [] # 存储竖切位置c₁, c₂当值为0时表示不切
self.row_cuts = [] # 存储横切位置r₁, r₂
2025-03-11 19:43:04 +08:00
self.init_maze_step = 0
2025-03-11 16:01:07 +08:00
# 路径规划阶段相关变量
self.MAX_STEPS = 50 # 迷宫走法步数上限
self.step_count = 0
self.rectangles = {}
2025-03-13 21:28:30 +08:00
self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)]
2025-03-11 16:01:07 +08:00
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
def reset(self, seed=None, options=None):
# 重置所有变量回到切分阶段phase 0
self.phase = 0
self.partition_step = 0
2025-03-14 11:01:02 +08:00
self.partition_values = np.zeros(self.CUT_NUM, dtype=np.float32)
2025-03-13 15:09:58 +08:00
self.col_cuts = []
self.row_cuts = []
2025-03-11 19:43:04 +08:00
self.init_maze_step = 0
2025-03-11 16:01:07 +08:00
self.region_centers = []
self.step_count = 0
self.rectangles = {}
2025-03-13 21:28:30 +08:00
self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)]
2025-03-11 16:01:07 +08:00
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
# 状态:前 4 维为 partition_values其余补 0
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
2025-03-13 21:28:30 +08:00
return state
2025-03-11 16:01:07 +08:00
def step(self, action):
# 在所有阶段动作均为 1 维连续动作,取 action[0]
a = float(action[0])
if self.phase == 0:
# 切分阶段:每一步输出一个标量,离散化为 {0, 0.1, ..., 0.9}
disc_val = np.floor(a * 10) / 10.0
disc_val = np.clip(disc_val, 0.0, 0.9)
self.partition_values[self.partition_step] = disc_val
self.partition_step += 1
# 构造当前状态:前 partition_step 个为已决策值,其余为 0再补 7 个 0
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
# 如果未完成 4 步则仍处于切分阶段不发奖励done 为 False
2025-03-14 11:01:02 +08:00
if self.partition_step < self.CUT_NUM:
2025-03-11 16:01:07 +08:00
return state, 0.0, False, False, {}
else:
# 完成 4 步后,计算切分边界
# 过滤掉 0并去重后排序
vert = sorted(set(v for v in self.partition_values[:len(
self.partition_values) // 2] if v > 0))
horiz = sorted(set(v for v in self.partition_values[len(
self.partition_values) // 2:] if v > 0))
2025-03-13 15:09:58 +08:00
vertical_cuts = vert if vert else []
horizontal_cuts = horiz if horiz else []
2025-03-11 16:01:07 +08:00
# 边界:始终包含 0 和 1
2025-03-13 15:09:58 +08:00
self.col_cuts = [0.0] + vertical_cuts + [1.0]
self.row_cuts = [0.0] + horizontal_cuts + [1.0]
2025-03-11 16:01:07 +08:00
# 判断分区是否合理,并计算各个分区的任务卸载率ρ
valid_partition = True
2025-03-13 15:09:58 +08:00
for i in range(len(self.row_cuts) - 1):
for j in range(len(self.col_cuts) - 1):
d = (self.col_cuts[j+1] - self.col_cuts[j]) * self.W * \
2025-03-14 09:42:56 +08:00
(self.row_cuts[i+1] - self.row_cuts[i]) * self.H
2025-03-11 16:01:07 +08:00
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
2025-03-11 19:43:04 +08:00
(self.comp_time_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
2025-03-11 16:01:07 +08:00
(self.comp_energy_factor * d -
self.trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
flight_time = self.flight_time_factor * d
2025-03-11 19:43:04 +08:00
bs_time = self.bs_time_factor * (1 - rho) * d
2025-03-11 16:01:07 +08:00
self.rectangles[(i, j)] = {
2025-03-13 15:09:58 +08:00
'center': ((self.row_cuts[i] + self.row_cuts[i+1]) * self.H / 2, (self.col_cuts[j+1] + self.col_cuts[j]) * self.W / 2),
2025-03-11 16:01:07 +08:00
'flight_time': flight_time,
2025-03-11 19:43:04 +08:00
'bs_time': bs_time,
2025-03-11 16:01:07 +08:00
'is_visited': False
}
if not valid_partition:
break
if not valid_partition:
2025-03-13 15:09:58 +08:00
reward = -10000
2025-03-11 16:01:07 +08:00
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state, reward, True, False, {}
else:
2025-03-11 19:43:04 +08:00
# 进入阶段 1初始化迷宫
2025-03-11 16:01:07 +08:00
self.phase = 1
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
2025-03-13 21:28:30 +08:00
reward = 10
# 构建反向索引,方便后续计算
self.reverse_rectangles = {v['center']: k for k, v in self.rectangles.items()}
2025-03-11 16:01:07 +08:00
return state, reward, False, False, {}
elif self.phase == 1:
# 阶段 1初始化迷宫让多个车辆从区域中心出发前往划分区域的中心点
# 确保 action 的值在 [0, 1],然后映射到 0~(num_regions-1) 的索引
2025-03-13 15:09:58 +08:00
num_regions = (len(self.col_cuts) - 1) * \
(len(self.row_cuts) - 1)
2025-03-11 16:01:07 +08:00
target_region_index = int(np.floor(a * num_regions))
target_region_index = np.clip(
target_region_index, 0, num_regions - 1)
2025-03-11 19:43:04 +08:00
# 将index映射到笛卡尔坐标
2025-03-13 15:09:58 +08:00
coord = (target_region_index // (len(self.col_cuts) - 1),
target_region_index % (len(self.col_cuts) - 1))
2025-03-13 21:28:30 +08:00
self.car_pos[self.init_maze_step] = self.rectangles[coord]['center']
2025-03-11 19:43:04 +08:00
self.car_traj[self.init_maze_step].append(coord)
2025-03-13 15:09:58 +08:00
self.rectangles[coord]['is_visited'] = True
2025-03-11 19:43:04 +08:00
# 计数
self.init_maze_step += 1
2025-03-11 16:01:07 +08:00
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
2025-03-11 19:43:04 +08:00
if self.init_maze_step < self.num_cars:
return state, 0.0, False, False, {}
else:
# 进入阶段 2走迷宫
self.phase = 2
return state, 0.0, False, False, {}
2025-03-11 16:01:07 +08:00
elif self.phase == 2:
# 阶段 2路径规划走迷宫
current_car = self.current_car_index
2025-03-13 21:28:30 +08:00
# 查表,找出当前车辆所在的网格
current_row, current_col = self.reverse_rectangles[self.car_pos[current_car]]
2025-03-11 16:01:07 +08:00
# 当前动作 a 为 1 维连续动作,映射到四个方向
if a < 0.2:
move_dir = 'up'
elif a < 0.4:
move_dir = 'down'
elif a < 0.6:
move_dir = 'left'
elif a < 0.8:
move_dir = 'right'
else:
move_dir = 'stay'
# 初始化新的行、列为当前值
new_row, new_col = current_row, current_col
2025-03-13 15:09:58 +08:00
if move_dir == 'up' and current_row > 0:
2025-03-11 16:01:07 +08:00
new_row = current_row - 1
2025-03-13 15:09:58 +08:00
elif move_dir == 'down' and current_row < len(self.row_cuts) - 2:
new_row = current_row + 1
2025-03-11 16:01:07 +08:00
elif move_dir == 'left' and current_col > 0:
new_col = current_col - 1
2025-03-13 15:09:58 +08:00
elif move_dir == 'right' and current_col < len(self.col_cuts) - 2:
2025-03-11 16:01:07 +08:00
new_col = current_col + 1
# 如果移动不合法或者动作为stay则保持原位置
# TODO 移动不合法,加一些惩罚
# 更新车辆位置
2025-03-13 21:28:30 +08:00
self.car_pos[current_car] = self.rectangles[(
new_row, new_col)]['center']
2025-03-11 16:01:07 +08:00
if new_row != current_row or new_col != current_col:
2025-03-13 15:09:58 +08:00
self.car_traj[current_car].append((new_row, new_col))
2025-03-11 16:01:07 +08:00
self.step_count += 1
self.current_car_index = (
self.current_car_index + 1) % self.num_cars
# 更新访问标记:将新网格标记为已访问
2025-03-11 19:43:04 +08:00
self.rectangles[(new_row, new_col)]['is_visited'] = True
2025-03-11 16:01:07 +08:00
# 观察状态
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
2025-03-11 19:43:04 +08:00
reward = 0
2025-03-11 16:01:07 +08:00
# Episode 终止条件:所有网格均被访问或步数达到上限
2025-03-11 19:43:04 +08:00
done = all([value['is_visited'] for _, value in self.rectangles.items()]) or (
2025-03-11 16:01:07 +08:00
self.step_count >= self.MAX_STEPS)
2025-03-11 19:43:04 +08:00
if done and all([value['is_visited'] for _, value in self.rectangles.items()]):
2025-03-11 16:01:07 +08:00
# 区域覆盖完毕,根据轨迹计算各车队的执行时间
T = max([self._compute_motorcade_time(idx)
for idx in range(self.num_cars)])
2025-03-12 16:09:19 +08:00
# print(T)
2025-03-13 15:55:14 +08:00
# print(self.partition_values)
2025-03-12 16:09:19 +08:00
# print(self.car_traj)
2025-03-13 15:55:14 +08:00
reward += self.BASE_LINE / T * 100
2025-03-11 16:01:07 +08:00
elif done and self.step_count >= self.MAX_STEPS:
2025-03-13 10:46:28 +08:00
reward += -10000
2025-03-11 16:01:07 +08:00
return state, reward, done, False, {}
def _compute_motorcade_time(self, idx):
2025-03-11 19:43:04 +08:00
flight_time = sum(self.rectangles[tuple(point)]['flight_time']
2025-03-11 16:01:07 +08:00
for point in self.car_traj[idx])
2025-03-11 19:43:04 +08:00
bs_time = sum(self.rectangles[tuple(point)]['bs_time']
2025-03-11 16:01:07 +08:00
for point in self.car_traj[idx])
# 计算车的移动时间,首先在轨迹的首尾添加上大区域中心
2025-03-11 19:43:04 +08:00
car_time = 0
for i in range(len(self.car_traj[idx]) - 1):
2025-03-11 16:01:07 +08:00
first_point = self.car_traj[idx][i]
second_point = self.car_traj[idx][i + 1]
2025-03-13 15:09:58 +08:00
car_time += math.dist(self.rectangles[first_point]['center'], self.rectangles[second_point]['center']) * \
2025-03-12 11:33:35 +08:00
self.car_time_factor
2025-03-13 15:09:58 +08:00
car_time += math.dist(self.rectangles[self.car_traj[idx][0]]['center'], [
self.H / 2, self.W / 2]) * self.car_time_factor
car_time += math.dist(self.rectangles[self.car_traj[idx][-1]]['center'], [
self.H / 2, self.W / 2]) * self.car_time_factor
2025-03-11 16:01:07 +08:00
2025-03-11 19:43:04 +08:00
return max(float(car_time) + flight_time, bs_time)
2025-03-11 16:01:07 +08:00
def render(self):
2025-03-12 11:33:35 +08:00
if self.phase == 1:
print("Phase 1: Initialize maze environment.")
2025-03-11 16:01:07 +08:00
print(f"Partition values so far: {self.partition_values}")
2025-03-12 11:33:35 +08:00
print(f"Motorcade positon: {self.car_pos}")
2025-03-13 15:55:14 +08:00
# input('1111')
2025-03-12 11:33:35 +08:00
elif self.phase == 2:
print("Phase 2: Play maze.")
print(f'Motorcade trajectory: {self.car_traj}')
2025-03-13 15:55:14 +08:00
# input('2222')