PPO能够跑起来了

This commit is contained in:
weixin_46229132 2025-03-11 19:43:04 +08:00
parent 4474a33cba
commit 3818343085

View File

@ -1,6 +1,8 @@
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import yaml
import math
class PartitionMazeEnv(gym.Env):
@ -16,22 +18,23 @@ class PartitionMazeEnv(gym.Env):
def __init__(self, config=None):
super(PartitionMazeEnv, self).__init__()
# 车队参数设置
self.H = 20 # 区域高度网格点之间的距离为25m单位距离
self.W = 30 # 区域宽度
self.num_cars = 2 # 系统数量(车-巢-机系统个数)
with open('params.yml', 'r', encoding='utf-8') as file:
params = yaml.safe_load(file)
# 时间系数(单位:秒,每个网格一张照片)
self.flight_time_factor = 3 # 每张照片对应的飞行时间无人机飞行速度为9.5m/s拍摄照片的时间间隔为3s
self.comp_uav_factor = 5 # 无人机上每张照片计算时间5s
self.trans_time_factor = 0.3 # 每张照片传输时间0.3s
self.car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间2s加了一个放大因子
self.comp_bs_factor = 5 # 机巢上每张照片计算时间
self.H = params['H']
self.W = params['W']
self.num_cars = params['num_cars']
# 能耗参数
self.flight_energy_factor = 0.05 # 单位:分钟/张
self.comp_energy_factor = 0.05 # 计算能耗需要重新估计
self.trans_energy_factor = 0.0025
self.battery_capacity = 10 # 无人机只进行飞行续航为30分钟
self.flight_time_factor = params['flight_time_factor']
self.comp_time_factor = params['comp_time_factor']
self.trans_time_factor = params['trans_time_factor']
self.car_time_factor = params['car_time_factor']
self.bs_time_factor = params['bs_time_factor']
self.flight_energy_factor = params['flight_energy_factor']
self.comp_energy_factor = params['comp_energy_factor']
self.trans_energy_factor = params['trans_energy_factor']
self.battery_energy_capacity = params['battery_energy_capacity']
self.phase = 0 # 阶段控制0区域划分阶段1迷宫初始化阶段2走迷宫阶段
self.partition_step = 0 # 区域划分阶段步数,范围 0~4
@ -48,16 +51,17 @@ class PartitionMazeEnv(gym.Env):
# 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0
# 阶段 1 状态:车辆位置 (2D)
self.observation_space = spaces.Box(
low=0.0, high=1.0, shape=(8,), dtype=np.float32)
low=0.0, high=1.0, shape=(4 + 2 * self.num_cars,), dtype=np.float32)
# 切分阶段相关变量
self.vertical_cuts = [] # 存储竖切位置c₁, c₂当值为0时表示不切
self.horizontal_cuts = [] # 存储横切位置r₁, r₂
# TODO region_centers可不可以优化一下减少一些参数
self.region_centers = [] # 存储切分后每个子区域的中心点(归一化坐标)
self.init_maze_step = 0
# 路径规划阶段相关变量
self.MAX_STEPS = 50 # 迷宫走法步数上限
self.BASE_LINE = 2750.0 # 基准时间通过greedy或者蒙特卡洛计算出来
self.step_count = 0
self.rectangles = {}
self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)]
@ -71,6 +75,7 @@ class PartitionMazeEnv(gym.Env):
self.partition_values = np.zeros(4, dtype=np.float32)
self.vertical_cuts = []
self.horizontal_cuts = []
self.init_maze_step = 0
self.region_centers = []
self.step_count = 0
self.rectangles = {}
@ -121,8 +126,8 @@ class PartitionMazeEnv(gym.Env):
d = (v_boundaries[j+1] - v_boundaries[j]) * self.W * \
(h_boundaries[i] + h_boundaries[i+1]) * self.H
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
(self.comp_uav_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
(self.comp_time_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
(self.comp_energy_factor * d -
self.trans_energy_factor * d)
if rho_energy_limit < 0:
@ -131,20 +136,12 @@ class PartitionMazeEnv(gym.Env):
rho = min(rho_time_limit, rho_energy_limit)
flight_time = self.flight_time_factor * d
comp_time = self.comp_uav_factor * rho * d
trans_time = self.trans_time_factor * (1 - rho) * d
comp_bs_time = self.comp_bs_factor * (1 - rho) * d
bs_time = self.bs_time_factor * (1 - rho) * d
self.rectangles[(i, j)] = {
# 'r1': h_boundaries[i], 'r2': h_boundaries[i+1], 'c1': v_boundaries[j], 'c2': v_boundaries[j+1],
'd': d,
'rho': rho,
'flight_time': flight_time,
'comp_time': comp_time,
'trans_time': trans_time,
'comp_bs_time': comp_bs_time,
'bs_time': bs_time,
'is_visited': False
# 'center': (center_r, center_c)
}
if not valid_partition:
break
@ -157,25 +154,11 @@ class PartitionMazeEnv(gym.Env):
else:
reward = 10
# 进入阶段 1迷宫
# 进入阶段 1初始化迷宫
self.phase = 1
# 根据分割边界计算每个子区域中心
self.region_centers = []
for i in range(len(h_boundaries) - 1):
for j in range(len(v_boundaries) - 1):
center_x = (
v_boundaries[j] + v_boundaries[j+1]) / 2.0
center_y = (
h_boundaries[i] + h_boundaries[i+1]) / 2.0
self.region_centers.append((center_x, center_y))
# 存储切分边界,供后续网格映射使用
self.v_boundaries = v_boundaries
self.h_boundaries = h_boundaries
# 初始化迷宫阶段:步数清零,建立 visited_grid 大小与网格数相同
self.step_count = 0
self.visited_grid = np.zeros(
(len(v_boundaries) - 1) * (len(h_boundaries) - 1), dtype=np.int32)
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
return state, reward, False, False, {}
@ -183,33 +166,33 @@ class PartitionMazeEnv(gym.Env):
elif self.phase == 1:
# 阶段 1初始化迷宫让多个车辆从区域中心出发前往划分区域的中心点
# 确保 action 的值在 [0, 1],然后映射到 0~(num_regions-1) 的索引
num_regions = len(self.region_centers)
num_regions = (len(self.v_boundaries) - 1) * \
(len(self.h_boundaries) - 1)
target_region_index = int(np.floor(a * num_regions))
target_region_index = np.clip(
target_region_index, 0, num_regions - 1)
# 将index映射到笛卡尔坐标
coord = [target_region_index // (len(self.v_boundaries) - 1),
target_region_index % (len(self.v_boundaries) - 1)]
self.car_pos[self.init_maze_step] = coord
self.car_traj[self.init_maze_step].append(coord)
self.rectangles[tuple(coord)]['is_visited'] = True
# 遍历所有车辆,让它们依次移动到目标子区域
for car_idx in range(self.num_cars):
target_position = np.array(
self.region_centers[target_region_index]) # 目标区域中心
# 更新该车辆位置
self.car_pos[car_idx] = target_position
# 累计步数
self.step_count += 1
self.car_traj[car_idx].append(target_position) # 记录每辆车的轨迹
# 进入阶段 2走迷宫
self.phase = 2
# 观察状态
# 计数
self.init_maze_step += 1
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
if self.init_maze_step < self.num_cars:
return state, 0.0, False, False, {}
else:
# 进入阶段 2走迷宫
self.phase = 2
return state, 0.0, False, False, {}
elif self.phase == 2:
# 阶段 2路径规划走迷宫
current_car = self.current_car_index
current_row, current_col = self.car_pos[current_car]
# 当前动作 a 为 1 维连续动作,映射到四个方向
if a < 0.2:
@ -223,18 +206,16 @@ class PartitionMazeEnv(gym.Env):
else:
move_dir = 'stay'
current_row, current_col = self.car_pos[current_car]
# 初始化新的行、列为当前值
new_row, new_col = current_row, current_col
if move_dir == 'up' and current_row < len(h_boundaries) - 1:
if move_dir == 'up' and current_row < len(self.h_boundaries) - 2:
new_row = current_row + 1
elif move_dir == 'down' and current_row > 0:
new_row = current_row - 1
elif move_dir == 'left' and current_col > 0:
new_col = current_col - 1
elif move_dir == 'right' and current_col < len(v_boundaries) - 1:
elif move_dir == 'right' and current_col < len(self.v_boundaries) - 2:
new_col = current_col + 1
# 如果移动不合法或者动作为stay则保持原位置
# TODO 移动不合法,加一些惩罚
@ -242,47 +223,49 @@ class PartitionMazeEnv(gym.Env):
# 更新车辆位置
self.car_pos[current_car] = [new_row, new_col]
if new_row != current_row or new_col != current_col:
self.car_traj[current_car].append(np.array(new_row, new_col))
self.car_traj[current_car].append([new_row, new_col])
self.step_count += 1
self.current_car_index = (
self.current_car_index + 1) % self.num_cars
# 更新访问标记:将新网格标记为已访问
self.rectangles[(new_col, new_col)]['is_visited'] = True
self.rectangles[(new_row, new_col)]['is_visited'] = True
# 观察状态
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
reward = 0
# Episode 终止条件:所有网格均被访问或步数达到上限
done = all([rec['is_visited'] for rec in self.rectangles]) or (
done = all([value['is_visited'] for _, value in self.rectangles.items()]) or (
self.step_count >= self.MAX_STEPS)
if done and np.all(self.visited_grid == 1):
if done and all([value['is_visited'] for _, value in self.rectangles.items()]):
# 区域覆盖完毕,根据轨迹计算各车队的执行时间
T = max([self._compute_motorcade_time(idx)
for idx in range(self.num_cars)])
reward += 10.0 # TODO 奖励与greedy比较
reward += -(T - self.BASE_LINE)
elif done and self.step_count >= self.MAX_STEPS:
reward -= 100
reward += -100
return state, reward, done, False, {}
def _compute_motorcade_time(self, idx):
flight_time = sum(self.rectangles[point]['flight_time']
flight_time = sum(self.rectangles[tuple(point)]['flight_time']
for point in self.car_traj[idx])
bs_time = sum(self.rectangles[point]['comp_bs_time']
bs_time = sum(self.rectangles[tuple(point)]['bs_time']
for point in self.car_traj[idx])
# 计算车的移动时间,首先在轨迹的首尾添加上大区域中心
car_time = 0
self.car_traj[idx].append([0.5, 0.5])
self.car_traj[idx].insert(0, [0.5, 0.5])
for i in range(len(self.car_traj[idx])):
for i in range(len(self.car_traj[idx]) - 1):
first_point = self.car_traj[idx][i]
second_point = self.car_traj[idx][i + 1]
car_time += np.linalg.norm(first_point, second_point) * \
self.H * self.W * self.car_move_time_factor
car_time += math.dist(first_point, second_point) * \
self.H * self.W * self.car_time_factor
return max(car_time + flight_time, bs_time)
return max(float(car_time) + flight_time, bs_time)
def render(self):
if self.phase == 0:
@ -291,5 +274,4 @@ class PartitionMazeEnv(gym.Env):
print(f"Partition values so far: {self.partition_values}")
elif self.phase == 1:
print("Phase 1: Path planning (maze).")
print(f"Visited grid: {self.visited_grid}")
print(f"Step count: {self.step_count}")