HPCC2025/PPO/env.py
2025-03-12 11:33:35 +08:00

283 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gymnasium as gym
from gymnasium import spaces
import numpy as np
import yaml
import math
class PartitionMazeEnv(gym.Env):
"""
自定义环境,分为两阶段:
阶段 0区域切分共 4 步,每一步输出一个标量,用于确定竖切和横切位置)。
切分顺序为:第一步输出 c₁第二步输出 c₂第三步输出 r₁第四步输出 r₂。
离散化后取值仅为 {0, 0.1, 0.2, …, 0.9}(其中 0 表示不切)。
阶段 1车辆路径规划走迷宫车辆从区域中心出发在九宫格内按照上下左右移动
直到所有目标格子被覆盖或步数上限达到。
"""
def __init__(self, config=None):
super(PartitionMazeEnv, self).__init__()
# 车队参数设置
with open('params.yml', 'r', encoding='utf-8') as file:
params = yaml.safe_load(file)
self.H = params['H']
self.W = params['W']
self.num_cars = params['num_cars']
self.flight_time_factor = params['flight_time_factor']
self.comp_time_factor = params['comp_time_factor']
self.trans_time_factor = params['trans_time_factor']
self.car_time_factor = params['car_time_factor']
self.bs_time_factor = params['bs_time_factor']
self.flight_energy_factor = params['flight_energy_factor']
self.comp_energy_factor = params['comp_energy_factor']
self.trans_energy_factor = params['trans_energy_factor']
self.battery_energy_capacity = params['battery_energy_capacity']
self.phase = 0 # 阶段控制0区域划分阶段1迷宫初始化阶段2走迷宫阶段
self.partition_step = 0 # 区域划分阶段步数,范围 0~4
# TODO 切的刀数现在固定为42+2
self.partition_values = np.zeros(
4, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂
# 定义动作空间:全部动作均为 1 维连续 [0,1]
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(1,), dtype=np.float32)
# 定义观察空间为8维向量
# TODO 返回的状态目前只有位置坐标
# 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0
# 阶段 1 状态:车辆位置 (2D)
self.observation_space = spaces.Box(
low=0.0, high=1.0, shape=(4 + 2 * self.num_cars,), dtype=np.float32)
# 切分阶段相关变量
self.vertical_cuts = [] # 存储竖切位置c₁, c₂当值为0时表示不切
self.horizontal_cuts = [] # 存储横切位置r₁, r₂
self.init_maze_step = 0
# 路径规划阶段相关变量
self.MAX_STEPS = 50 # 迷宫走法步数上限
self.BASE_LINE = 3400.0 # 基准时间通过greedy或者蒙特卡洛计算出来
self.step_count = 0
self.rectangles = {}
self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)]
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
def reset(self, seed=None, options=None):
# 重置所有变量回到切分阶段phase 0
self.phase = 0
self.partition_step = 0
self.partition_values = np.zeros(4, dtype=np.float32)
self.vertical_cuts = []
self.horizontal_cuts = []
self.init_maze_step = 0
self.region_centers = []
self.step_count = 0
self.rectangles = {}
self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)]
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
# 状态:前 4 维为 partition_values其余补 0
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state, {}
def step(self, action):
# 在所有阶段动作均为 1 维连续动作,取 action[0]
a = float(action[0])
if self.phase == 0:
# 切分阶段:每一步输出一个标量,离散化为 {0, 0.1, ..., 0.9}
disc_val = np.floor(a * 10) / 10.0
disc_val = np.clip(disc_val, 0.0, 0.9)
self.partition_values[self.partition_step] = disc_val
self.partition_step += 1
# 构造当前状态:前 partition_step 个为已决策值,其余为 0再补 7 个 0
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
# 如果未完成 4 步则仍处于切分阶段不发奖励done 为 False
if self.partition_step < 4:
return state, 0.0, False, False, {}
else:
# 完成 4 步后,计算切分边界
# 过滤掉 0并去重后排序
vert = sorted(set(v for v in self.partition_values[:len(
self.partition_values) // 2] if v > 0))
horiz = sorted(set(v for v in self.partition_values[len(
self.partition_values) // 2:] if v > 0))
self.vertical_cuts = vert if vert else []
self.horizontal_cuts = horiz if horiz else []
# 边界:始终包含 0 和 1
v_boundaries = [0.0] + self.vertical_cuts + [1.0]
h_boundaries = [0.0] + self.horizontal_cuts + [1.0]
# 判断分区是否合理,并计算各个分区的任务卸载率ρ
valid_partition = True
for i in range(len(h_boundaries) - 1):
for j in range(len(v_boundaries) - 1):
d = (v_boundaries[j+1] - v_boundaries[j]) * self.W * \
(h_boundaries[i] + h_boundaries[i+1]) * self.H
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
(self.comp_time_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
(self.comp_energy_factor * d -
self.trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
flight_time = self.flight_time_factor * d
bs_time = self.bs_time_factor * (1 - rho) * d
self.rectangles[(i, j)] = {
'center': ((h_boundaries[i] + h_boundaries[i+1]) * self.H / 2, (v_boundaries[j+1] - v_boundaries[j]) * self.W / 2),
'flight_time': flight_time,
'bs_time': bs_time,
'is_visited': False
}
if not valid_partition:
break
if not valid_partition:
reward = -100
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state, reward, True, False, {}
else:
reward = 10
# 进入阶段 1初始化迷宫
self.phase = 1
# 存储切分边界,供后续网格映射使用
self.v_boundaries = v_boundaries
self.h_boundaries = h_boundaries
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
return state, reward, False, False, {}
elif self.phase == 1:
# 阶段 1初始化迷宫让多个车辆从区域中心出发前往划分区域的中心点
# 确保 action 的值在 [0, 1],然后映射到 0~(num_regions-1) 的索引
num_regions = (len(self.v_boundaries) - 1) * \
(len(self.h_boundaries) - 1)
target_region_index = int(np.floor(a * num_regions))
target_region_index = np.clip(
target_region_index, 0, num_regions - 1)
# 将index映射到笛卡尔坐标
coord = [target_region_index // (len(self.v_boundaries) - 1),
target_region_index % (len(self.v_boundaries) - 1)]
self.car_pos[self.init_maze_step] = coord
self.car_traj[self.init_maze_step].append(coord)
self.rectangles[tuple(coord)]['is_visited'] = True
# 计数
self.init_maze_step += 1
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
if self.init_maze_step < self.num_cars:
return state, 0.0, False, False, {}
else:
# 进入阶段 2走迷宫
self.phase = 2
return state, 0.0, False, False, {}
elif self.phase == 2:
# 阶段 2路径规划走迷宫
current_car = self.current_car_index
current_row, current_col = self.car_pos[current_car]
# 当前动作 a 为 1 维连续动作,映射到四个方向
if a < 0.2:
move_dir = 'up'
elif a < 0.4:
move_dir = 'down'
elif a < 0.6:
move_dir = 'left'
elif a < 0.8:
move_dir = 'right'
else:
move_dir = 'stay'
# 初始化新的行、列为当前值
new_row, new_col = current_row, current_col
if move_dir == 'up' and current_row < len(self.h_boundaries) - 2:
new_row = current_row + 1
elif move_dir == 'down' and current_row > 0:
new_row = current_row - 1
elif move_dir == 'left' and current_col > 0:
new_col = current_col - 1
elif move_dir == 'right' and current_col < len(self.v_boundaries) - 2:
new_col = current_col + 1
# 如果移动不合法或者动作为stay则保持原位置
# TODO 移动不合法,加一些惩罚
# 更新车辆位置
self.car_pos[current_car] = [new_row, new_col]
if new_row != current_row or new_col != current_col:
self.car_traj[current_car].append([new_row, new_col])
self.step_count += 1
self.current_car_index = (
self.current_car_index + 1) % self.num_cars
# 更新访问标记:将新网格标记为已访问
self.rectangles[(new_row, new_col)]['is_visited'] = True
# 观察状态
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
reward = 0
# Episode 终止条件:所有网格均被访问或步数达到上限
done = all([value['is_visited'] for _, value in self.rectangles.items()]) or (
self.step_count >= self.MAX_STEPS)
if done and all([value['is_visited'] for _, value in self.rectangles.items()]):
# 区域覆盖完毕,根据轨迹计算各车队的执行时间
T = max([self._compute_motorcade_time(idx)
for idx in range(self.num_cars)])
print(T)
print(self.car_traj)
reward += -(T - self.BASE_LINE)
elif done and self.step_count >= self.MAX_STEPS:
reward += -100
return state, reward, done, False, {}
def _compute_motorcade_time(self, idx):
flight_time = sum(self.rectangles[tuple(point)]['flight_time']
for point in self.car_traj[idx])
bs_time = sum(self.rectangles[tuple(point)]['bs_time']
for point in self.car_traj[idx])
# 计算车的移动时间,首先在轨迹的首尾添加上大区域中心
car_time = 0
# self.car_traj[idx].append([0.5, 0.5])
# self.car_traj[idx].insert(0, [0.5, 0.5])
for i in range(len(self.car_traj[idx]) - 1):
first_point = self.car_traj[idx][i]
second_point = self.car_traj[idx][i + 1]
car_time += math.dist(self.rectangles[tuple(first_point)]['center'], self.rectangles[tuple(second_point)]['center']) * \
self.car_time_factor
car_time + math.dist(self.rectangles[tuple(self.car_traj[idx][0])]['center'], [self.H, self.W])
car_time + math.dist(self.rectangles[tuple(self.car_traj[idx][-1])]['center'], [self.H, self.W])
return max(float(car_time) + flight_time, bs_time)
def render(self):
if self.phase == 1:
print("Phase 1: Initialize maze environment.")
print(f"Partition values so far: {self.partition_values}")
print(f"Motorcade positon: {self.car_pos}")
elif self.phase == 2:
print("Phase 2: Play maze.")
print(f'Motorcade trajectory: {self.car_traj}')