import gymnasium as gym from gymnasium import spaces import numpy as np import yaml import math class PartitionMazeEnv(gym.Env): """ 自定义环境,分为两阶段: 阶段 0:区域切分(共 4 步,每一步输出一个标量,用于确定竖切和横切位置)。 切分顺序为:第一步输出 c₁,第二步输出 c₂,第三步输出 r₁,第四步输出 r₂。 离散化后取值仅为 {0, 0.1, 0.2, …, 0.9}(其中 0 表示不切)。 阶段 1:车辆路径规划(走迷宫),车辆从区域中心出发,在九宫格内按照上下左右移动, 直到所有目标格子被覆盖或步数上限达到。 """ def __init__(self, config=None): super(PartitionMazeEnv, self).__init__() # 车队参数设置 with open('params.yml', 'r', encoding='utf-8') as file: params = yaml.safe_load(file) self.H = params['H'] self.W = params['W'] self.num_cars = params['num_cars'] self.flight_time_factor = params['flight_time_factor'] self.comp_time_factor = params['comp_time_factor'] self.trans_time_factor = params['trans_time_factor'] self.car_time_factor = params['car_time_factor'] self.bs_time_factor = params['bs_time_factor'] self.flight_energy_factor = params['flight_energy_factor'] self.comp_energy_factor = params['comp_energy_factor'] self.trans_energy_factor = params['trans_energy_factor'] self.battery_energy_capacity = params['battery_energy_capacity'] ############################## # 可能需要手动修改的超参数 ############################## self.CUT_NUM = 4 # 横切一半,竖切一半 self.BASE_LINE = 4000 # 基准时间,通过greedy或者蒙特卡洛计算出来 self.MAX_STEPS = 50 # 迷宫走法步数上限 self.phase = 0 # 阶段控制,0:区域划分阶段,1:迷宫初始化阶段,2:走迷宫阶段 self.partition_step = 0 # 区域划分阶段步数,范围 0~4 self.partition_values = np.zeros( self.CUT_NUM, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂ # 定义动作空间:长度为 15 的离散动作空间 # 前 10 个表示切分动作 {0, 0.1, ..., 0.9},后 5 个表示上下左右移动和保持不动 self.action_space = spaces.Discrete(15) # 定义观察空间为8维向量 # 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0) # 阶段 1 状态:车辆位置 (2D) max_regions = (self.CUT_NUM // 2 + 1) ** 2 self.observation_space = spaces.Box( low=0.0, high=100.0, shape=(1 + self.CUT_NUM + max_regions,), dtype=np.float32) # 切分阶段相关变量 self.col_cuts = [] # 存储竖切位置(c₁, c₂),当值为0时表示不切 self.row_cuts = [] # 存储横切位置(r₁, r₂) self.init_maze_step = 0 # 路径规划阶段相关变量 self.step_count = 0 self.rectangles = {} self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)] self.car_traj = [[] for _ in range(self.num_cars)] self.current_car_index = 0 def reset(self, seed=None, options=None): # 重置所有变量,回到切分阶段(phase 0) self.phase = 0 self.partition_step = 0 self.partition_values = np.zeros(self.CUT_NUM, dtype=np.float32) self.col_cuts = [] self.row_cuts = [] self.init_maze_step = 0 self.region_centers = [] self.step_count = 0 self.rectangles = {} self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)] self.car_traj = [[] for _ in range(self.num_cars)] self.current_car_index = 0 # 状态:前 4 维为 partition_values,其余补 0 max_regions = (self.CUT_NUM // 2 + 1) ** 2 state = np.concatenate([ [self.phase], self.partition_values, np.zeros(max_regions, dtype=np.float32) ]) return state def step(self, action): # 在所有阶段动作均为离散动作 if self.phase == 0: # 切分阶段:前 10 个动作对应 {0, 0.1, ..., 0.9} disc_val = action * 0.1 # 修正为动作直接映射到切分比例 self.partition_values[self.partition_step] = disc_val self.partition_step += 1 # 构造当前状态:前 partition_step 个为已决策值,其余为 0,再补 7 个 0 state = np.concatenate( [[self.phase], self.partition_values, np.zeros( (self.CUT_NUM // 2 + 1) ** 2, dtype=np.float32)] ) # 如果未完成 4 步,则仍处于切分阶段,不发奖励,done 为 False if self.partition_step < self.CUT_NUM: return state, 0.0, False, False, {} else: # 完成 4 步后,计算切分边界 # 过滤掉 0,并去重后排序 vert = sorted(set(v for v in self.partition_values[:len( self.partition_values) // 2] if v > 0)) horiz = sorted(set(v for v in self.partition_values[len( self.partition_values) // 2:] if v > 0)) vertical_cuts = vert if vert else [] horizontal_cuts = horiz if horiz else [] # 边界:始终包含 0 和 1 self.col_cuts = [0.0] + vertical_cuts + [1.0] self.row_cuts = [0.0] + horizontal_cuts + [1.0] # 判断分区是否合理,并计算各个分区的任务卸载率ρ valid_partition = True for i in range(len(self.row_cuts) - 1): for j in range(len(self.col_cuts) - 1): d = (self.col_cuts[j+1] - self.col_cuts[j]) * self.W * \ (self.row_cuts[i+1] - self.row_cuts[i]) * self.H rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ (self.comp_time_factor - self.trans_time_factor) rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \ (self.comp_energy_factor * d - self.trans_energy_factor * d) if rho_energy_limit < 0: valid_partition = False break rho = min(rho_time_limit, rho_energy_limit) flight_time = self.flight_time_factor * d bs_time = self.bs_time_factor * (1 - rho) * d self.rectangles[(i, j)] = { 'center': ((self.row_cuts[i] + self.row_cuts[i+1]) * self.H / 2, (self.col_cuts[j+1] + self.col_cuts[j]) * self.W / 2), 'flight_time': flight_time, 'bs_time': bs_time, 'is_visited': False } if not valid_partition: break if not valid_partition: reward = -10000 state = np.concatenate( [[self.phase], self.partition_values, np.zeros( (self.CUT_NUM // 2 + 1) ** 2, dtype=np.float32)] ) return state, reward, True, False, {} else: # 初始化迷宫 self.phase = 1 reward = 10 # 构建反向索引,方便后续计算 self.reverse_rectangles = { v['center']: k for k, v in self.rectangles.items()} # 阶段 1:初始化迷宫,让多个车辆从区域中心出发,前往最近的几个区域中心点 region_centers = [ (i, j, self.rectangles[(i, j)]['center']) for i in range(len(self.row_cuts) - 1) for j in range(len(self.col_cuts) - 1) ] # 按照与区域中心的距离从近到远排序 region_centers.sort( key=lambda x: math.dist(x[2], (self.H / 2, self.W / 2)) ) # 分配最近的区域给每辆车 for idx in range(self.num_cars): i, j, center = region_centers[idx] self.car_pos[idx] = center self.car_traj[idx].append((i, j)) self.rectangles[(i, j)]['is_visited'] = True # 进入阶段 2:走迷宫 self.phase = 2 # 构造访问状态向量 max_regions = (self.CUT_NUM // 2 + 1) ** 2 visit_status = np.zeros(max_regions, dtype=np.float32) # 将实际区域的访问状态填入向量 for i in range(len(self.row_cuts) - 1): for j in range(len(self.col_cuts) - 1): idx = i * (len(self.col_cuts) - 1) + j visit_status[idx] = float( self.rectangles[(i, j)]['is_visited']) for i in range(idx + 1, max_regions): visit_status[i] = 100 state = np.concatenate( [[self.phase], self.partition_values, visit_status]) return state, reward, False, False, {} elif self.phase == 2: # 阶段 2:路径规划(走迷宫) reward = 0 # 后 4 个动作对应上下左右移动 current_car = self.current_car_index current_row, current_col = self.reverse_rectangles[self.car_pos[current_car]] # 初始化新的行、列为当前值 new_row, new_col = current_row, current_col if action == 10 and current_row > 0: # 上 new_row = current_row - 1 elif action == 11 and current_row < len(self.row_cuts) - 2: # 下 new_row = current_row + 1 elif action == 12 and current_col > 0: # 左 new_col = current_col - 1 elif action == 13 and current_col < len(self.col_cuts) - 2: # 右 new_col = new_col + 1 else: # 无效动作,保持原地 pass # 检查是否移动 car_moved = (new_row != current_row or new_col != current_col) # 更新车辆位置 self.car_pos[current_car] = self.rectangles[( new_row, new_col)]['center'] if car_moved: self.car_traj[current_car].append((new_row, new_col)) # 更新访问标记:将新网格标记为已访问 self.rectangles[(new_row, new_col)]['is_visited'] = True # 记录所有车辆一轮中是否移动 if self.current_car_index == 0: # 如果是新一轮的开始,初始化移动标记 self.cars_moved = [False] * self.num_cars self.cars_moved[current_car] = car_moved # 如果一轮结束,检查是否所有车辆都没有移动 if self.current_car_index == (self.num_cars - 1) and not any(self.cars_moved): reward -= 10 # 扣除 10 分奖励 self.step_count += 1 self.current_car_index = ( self.current_car_index + 1) % self.num_cars max_regions = (self.CUT_NUM // 2 + 1) ** 2 visit_status = np.zeros(max_regions, dtype=np.float32) # 将实际区域的访问状态填入向量 for i in range(len(self.row_cuts) - 1): for j in range(len(self.col_cuts) - 1): idx = i * (len(self.col_cuts) - 1) + j visit_status[idx] = float( self.rectangles[(i, j)]['is_visited']) for i in range(idx + 1, max_regions): visit_status[i] = 100 state = np.concatenate( [[self.phase], self.partition_values, visit_status]) # Episode 终止条件:所有网格均被访问或步数达到上限 done = all([value['is_visited'] for _, value in self.rectangles.items()]) or ( self.step_count >= self.MAX_STEPS) if done and all([value['is_visited'] for _, value in self.rectangles.items()]): # 区域覆盖完毕,根据轨迹计算各车队的执行时间 T = max([self._compute_motorcade_time(idx) for idx in range(self.num_cars)]) # print(T) # print(self.partition_values) # print(self.car_traj) reward += self.BASE_LINE / T * 1000 # print(reward) elif done and self.step_count >= self.MAX_STEPS: reward += -1000 return state, reward, done, False, {} def _compute_motorcade_time(self, idx): flight_time = sum(self.rectangles[tuple(point)]['flight_time'] for point in self.car_traj[idx]) bs_time = sum(self.rectangles[tuple(point)]['bs_time'] for point in self.car_traj[idx]) # 计算车的移动时间,首先在轨迹的首尾添加上大区域中心 car_time = 0 for i in range(len(self.car_traj[idx]) - 1): first_point = self.car_traj[idx][i] second_point = self.car_traj[idx][i + 1] car_time += math.dist(self.rectangles[first_point]['center'], self.rectangles[second_point]['center']) * \ self.car_time_factor car_time += math.dist(self.rectangles[self.car_traj[idx][0]]['center'], [ self.H / 2, self.W / 2]) * self.car_time_factor car_time += math.dist(self.rectangles[self.car_traj[idx][-1]]['center'], [ self.H / 2, self.W / 2]) * self.car_time_factor return max(float(car_time) + flight_time, bs_time) def render(self): if self.phase == 1: print("Phase 1: Initialize maze environment.") print(f"Partition values so far: {self.partition_values}") print(f"Motorcade positon: {self.car_pos}") # input('1111') elif self.phase == 2: print("Phase 2: Play maze.") print(f'Motorcade trajectory: {self.car_traj}') # input('2222')