import gymnasium as gym from gymnasium import spaces import numpy as np class PartitionMazeEnv(gym.Env): """ 自定义环境,分为两阶段: 阶段 0:区域切分(共 4 步,每一步输出一个标量,用于确定竖切和横切位置)。 切分顺序为:第一步输出 c₁,第二步输出 c₂,第三步输出 r₁,第四步输出 r₂。 离散化后取值仅为 {0, 0.1, 0.2, …, 0.9}(其中 0 表示不切)。 阶段 1:车辆路径规划(走迷宫),车辆从区域中心出发,在九宫格内按照上下左右移动, 直到所有目标格子被覆盖或步数上限达到。 """ def __init__(self, config=None): super(PartitionMazeEnv, self).__init__() # 车队参数设置 self.H = 20 # 区域高度,网格点之间的距离为25m(单位距离) self.W = 30 # 区域宽度 self.num_cars = 2 # 系统数量(车-巢-机系统个数) # 时间系数(单位:秒,每个网格一张照片) self.flight_time_factor = 3 # 每张照片对应的飞行时间,无人机飞行速度为9.5m/s,拍摄照片的时间间隔为3s self.comp_uav_factor = 5 # 无人机上每张照片计算时间,5s self.trans_time_factor = 0.3 # 每张照片传输时间,0.3s self.car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间,2s,加了一个放大因子 self.comp_bs_factor = 5 # 机巢上每张照片计算时间 # 能耗参数 self.flight_energy_factor = 0.05 # 单位:分钟/张 self.comp_energy_factor = 0.05 # 计算能耗需要重新估计 self.trans_energy_factor = 0.0025 self.battery_capacity = 10 # 无人机只进行飞行,续航为30分钟 self.phase = 0 # 阶段控制,0:区域划分阶段,1:迷宫初始化阶段,2:走迷宫阶段 self.partition_step = 0 # 区域划分阶段步数,范围 0~4 # TODO 切的刀数现在固定为4(2+2) self.partition_values = np.zeros( 4, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂ # 定义动作空间:全部动作均为 1 维连续 [0,1] self.action_space = spaces.Box( low=0.0, high=1.0, shape=(1,), dtype=np.float32) # 定义观察空间为8维向量 # TODO 返回的状态目前只有位置坐标 # 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0) # 阶段 1 状态:车辆位置 (2D) self.observation_space = spaces.Box( low=0.0, high=1.0, shape=(8,), dtype=np.float32) # 切分阶段相关变量 self.vertical_cuts = [] # 存储竖切位置(c₁, c₂),当值为0时表示不切 self.horizontal_cuts = [] # 存储横切位置(r₁, r₂) # TODO region_centers可不可以优化一下,减少一些参数 self.region_centers = [] # 存储切分后每个子区域的中心点(归一化坐标) # 路径规划阶段相关变量 self.MAX_STEPS = 50 # 迷宫走法步数上限 self.step_count = 0 self.rectangles = {} self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)] self.car_traj = [[] for _ in range(self.num_cars)] self.current_car_index = 0 def reset(self, seed=None, options=None): # 重置所有变量,回到切分阶段(phase 0) self.phase = 0 self.partition_step = 0 self.partition_values = np.zeros(4, dtype=np.float32) self.vertical_cuts = [] self.horizontal_cuts = [] self.region_centers = [] self.step_count = 0 self.rectangles = {} self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)] self.car_traj = [[] for _ in range(self.num_cars)] self.current_car_index = 0 # 状态:前 4 维为 partition_values,其余补 0 state = np.concatenate( [self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)]) return state, {} def step(self, action): # 在所有阶段动作均为 1 维连续动作,取 action[0] a = float(action[0]) if self.phase == 0: # 切分阶段:每一步输出一个标量,离散化为 {0, 0.1, ..., 0.9} disc_val = np.floor(a * 10) / 10.0 disc_val = np.clip(disc_val, 0.0, 0.9) self.partition_values[self.partition_step] = disc_val self.partition_step += 1 # 构造当前状态:前 partition_step 个为已决策值,其余为 0,再补 7 个 0 state = np.concatenate( [self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)]) # 如果未完成 4 步,则仍处于切分阶段,不发奖励,done 为 False if self.partition_step < 4: return state, 0.0, False, False, {} else: # 完成 4 步后,计算切分边界 # 过滤掉 0,并去重后排序 vert = sorted(set(v for v in self.partition_values[:len( self.partition_values) // 2] if v > 0)) horiz = sorted(set(v for v in self.partition_values[len( self.partition_values) // 2:] if v > 0)) self.vertical_cuts = vert if vert else [] self.horizontal_cuts = horiz if horiz else [] # 边界:始终包含 0 和 1 v_boundaries = [0.0] + self.vertical_cuts + [1.0] h_boundaries = [0.0] + self.horizontal_cuts + [1.0] # 判断分区是否合理,并计算各个分区的任务卸载率ρ valid_partition = True for i in range(len(h_boundaries) - 1): for j in range(len(v_boundaries) - 1): d = (v_boundaries[j+1] - v_boundaries[j]) * self.W * \ (h_boundaries[i] + h_boundaries[i+1]) * self.H rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ (self.comp_uav_factor - self.trans_time_factor) rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \ (self.comp_energy_factor * d - self.trans_energy_factor * d) if rho_energy_limit < 0: valid_partition = False break rho = min(rho_time_limit, rho_energy_limit) flight_time = self.flight_time_factor * d comp_time = self.comp_uav_factor * rho * d trans_time = self.trans_time_factor * (1 - rho) * d comp_bs_time = self.comp_bs_factor * (1 - rho) * d self.rectangles[(i, j)] = { # 'r1': h_boundaries[i], 'r2': h_boundaries[i+1], 'c1': v_boundaries[j], 'c2': v_boundaries[j+1], 'd': d, 'rho': rho, 'flight_time': flight_time, 'comp_time': comp_time, 'trans_time': trans_time, 'comp_bs_time': comp_bs_time, 'is_visited': False # 'center': (center_r, center_c) } if not valid_partition: break if not valid_partition: reward = -100 state = np.concatenate( [self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)]) return state, reward, True, False, {} else: reward = 10 # 进入阶段 1:走迷宫 self.phase = 1 # 根据分割边界计算每个子区域中心 self.region_centers = [] for i in range(len(h_boundaries) - 1): for j in range(len(v_boundaries) - 1): center_x = ( v_boundaries[j] + v_boundaries[j+1]) / 2.0 center_y = ( h_boundaries[i] + h_boundaries[i+1]) / 2.0 self.region_centers.append((center_x, center_y)) # 存储切分边界,供后续网格映射使用 self.v_boundaries = v_boundaries self.h_boundaries = h_boundaries # 初始化迷宫阶段:步数清零,建立 visited_grid 大小与网格数相同 self.step_count = 0 self.visited_grid = np.zeros( (len(v_boundaries) - 1) * (len(h_boundaries) - 1), dtype=np.int32) state = np.concatenate( [self.partition_values, np.array(self.car_pos).flatten()]) return state, reward, False, False, {} elif self.phase == 1: # 阶段 1:初始化迷宫,让多个车辆从区域中心出发,前往划分区域的中心点 # 确保 action 的值在 [0, 1],然后映射到 0~(num_regions-1) 的索引 num_regions = len(self.region_centers) target_region_index = int(np.floor(a * num_regions)) target_region_index = np.clip( target_region_index, 0, num_regions - 1) # 遍历所有车辆,让它们依次移动到目标子区域 for car_idx in range(self.num_cars): target_position = np.array( self.region_centers[target_region_index]) # 目标区域中心 # 更新该车辆位置 self.car_pos[car_idx] = target_position # 累计步数 self.step_count += 1 self.car_traj[car_idx].append(target_position) # 记录每辆车的轨迹 # 进入阶段 2:走迷宫 self.phase = 2 # 观察状态 state = np.concatenate( [self.partition_values, np.array(self.car_pos).flatten()]) return state, 0.0, False, False, {} elif self.phase == 2: # 阶段 2:路径规划(走迷宫) current_car = self.current_car_index # 当前动作 a 为 1 维连续动作,映射到四个方向 if a < 0.2: move_dir = 'up' elif a < 0.4: move_dir = 'down' elif a < 0.6: move_dir = 'left' elif a < 0.8: move_dir = 'right' else: move_dir = 'stay' current_row, current_col = self.car_pos[current_car] # 初始化新的行、列为当前值 new_row, new_col = current_row, current_col if move_dir == 'up' and current_row < len(h_boundaries) - 1: new_row = current_row + 1 elif move_dir == 'down' and current_row > 0: new_row = current_row - 1 elif move_dir == 'left' and current_col > 0: new_col = current_col - 1 elif move_dir == 'right' and current_col < len(v_boundaries) - 1: new_col = current_col + 1 # 如果移动不合法,或者动作为stay,则保持原位置 # TODO 移动不合法,加一些惩罚 # 更新车辆位置 self.car_pos[current_car] = [new_row, new_col] if new_row != current_row or new_col != current_col: self.car_traj[current_car].append(np.array(new_row, new_col)) self.step_count += 1 self.current_car_index = ( self.current_car_index + 1) % self.num_cars # 更新访问标记:将新网格标记为已访问 self.rectangles[(new_col, new_col)]['is_visited'] = True # 观察状态 state = np.concatenate( [self.partition_values, np.array(self.car_pos).flatten()]) # Episode 终止条件:所有网格均被访问或步数达到上限 done = all([rec['is_visited'] for rec in self.rectangles]) or ( self.step_count >= self.MAX_STEPS) if done and np.all(self.visited_grid == 1): # 区域覆盖完毕,根据轨迹计算各车队的执行时间 T = max([self._compute_motorcade_time(idx) for idx in range(self.num_cars)]) reward += 10.0 # TODO 奖励与greedy比较 elif done and self.step_count >= self.MAX_STEPS: reward -= 100 return state, reward, done, False, {} def _compute_motorcade_time(self, idx): flight_time = sum(self.rectangles[point]['flight_time'] for point in self.car_traj[idx]) bs_time = sum(self.rectangles[point]['comp_bs_time'] for point in self.car_traj[idx]) # 计算车的移动时间,首先在轨迹的首尾添加上大区域中心 self.car_traj[idx].append([0.5, 0.5]) self.car_traj[idx].insert(0, [0.5, 0.5]) for i in range(len(self.car_traj[idx])): first_point = self.car_traj[idx][i] second_point = self.car_traj[idx][i + 1] car_time += np.linalg.norm(first_point, second_point) * \ self.H * self.W * self.car_move_time_factor return max(car_time + flight_time, bs_time) def render(self): if self.phase == 0: print("Phase 0: Partitioning.") print(f"Partition step: {self.partition_step}") print(f"Partition values so far: {self.partition_values}") elif self.phase == 1: print("Phase 1: Path planning (maze).") print(f"Visited grid: {self.visited_grid}") print(f"Step count: {self.step_count}")