import gymnasium as gym from gymnasium import spaces import numpy as np import yaml import math class PartitionMazeEnv(gym.Env): """ 自定义环境,分为两阶段: 阶段 0:区域切分(共 4 步,每一步输出一个标量,用于确定竖切和横切位置)。 切分顺序为:第一步输出 c₁,第二步输出 c₂,第三步输出 r₁,第四步输出 r₂。 离散化后取值仅为 {0, 0.1, 0.2, …, 0.9}(其中 0 表示不切)。 阶段 1:车辆路径规划(走迷宫),车辆从区域中心出发,在九宫格内按照上下左右移动, 直到所有目标格子被覆盖或步数上限达到。 """ def __init__(self, config=None): super(PartitionMazeEnv, self).__init__() # 车队参数设置 with open('params.yml', 'r', encoding='utf-8') as file: params = yaml.safe_load(file) self.H = params['H'] self.W = params['W'] self.num_cars = params['num_cars'] self.flight_time_factor = params['flight_time_factor'] self.comp_time_factor = params['comp_time_factor'] self.trans_time_factor = params['trans_time_factor'] self.car_time_factor = params['car_time_factor'] self.bs_time_factor = params['bs_time_factor'] self.flight_energy_factor = params['flight_energy_factor'] self.comp_energy_factor = params['comp_energy_factor'] self.trans_energy_factor = params['trans_energy_factor'] self.battery_energy_capacity = params['battery_energy_capacity'] ############################## # 可能需要手动修改的超参数 ############################## self.CUT_NUM = 4 # 横切一半,竖切一半 self.BASE_LINE = 4000 # 基准时间,通过greedy或者蒙特卡洛计算出来 self.MAX_STEPS = 50 # 迷宫走法步数上限 self.phase = 0 # 阶段控制,0:区域划分阶段,1:迷宫初始化阶段,2:走迷宫阶段 self.partition_step = 0 # 区域划分阶段步数,范围 0~4 self.partition_values = np.zeros( self.CUT_NUM, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂ # 定义动作空间:全部动作均为 1 维连续 [0,1] self.action_space = spaces.Box( low=0.0, high=1.0, shape=(1,), dtype=np.float32) # 定义观察空间为8维向量 # 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0) # 阶段 1 状态:区域访问状态向量(长度为(CUT_NUM/2+1)^2) max_regions = (self.CUT_NUM // 2 + 1) ** 2 self.observation_space = spaces.Box( low=0.0, high=1.0, shape=(self.CUT_NUM + max_regions,), dtype=np.float32) # 切分阶段相关变量 self.col_cuts = [] # 存储竖切位置(c₁, c₂),当值为0时表示不切 self.row_cuts = [] # 存储横切位置(r₁, r₂) self.init_maze_step = 0 # 路径规划阶段相关变量 self.step_count = 0 self.rectangles = {} self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)] self.car_traj = [[] for _ in range(self.num_cars)] self.current_car_index = 0 def reset(self, seed=None, options=None): # 重置所有变量,回到切分阶段(phase 0) self.phase = 0 self.partition_step = 0 self.partition_values = np.zeros(self.CUT_NUM, dtype=np.float32) self.col_cuts = [] self.row_cuts = [] self.init_maze_step = 0 self.region_centers = [] self.step_count = 0 self.rectangles = {} self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)] self.car_traj = [[] for _ in range(self.num_cars)] self.current_car_index = 0 # 状态:前 4 维为 partition_values,其余为区域访问状态(初始全0) max_regions = (self.CUT_NUM // 2 + 1) ** 2 state = np.concatenate([ self.partition_values, np.zeros(max_regions, dtype=np.float32) ]) return state def step(self, action): # 在所有阶段动作均为 1 维连续动作,取 action[0] a = float(action[0]) if self.phase == 0: # 切分阶段:每一步输出一个标量,离散化为 {0, 0.1, ..., 0.9} disc_val = np.floor(a * 10) / 10.0 disc_val = np.clip(disc_val, 0.0, 0.9) self.partition_values[self.partition_step] = disc_val self.partition_step += 1 # 构造当前状态:前 partition_step 个为已决策值,其余为 0,再补 7 个 0 state = np.concatenate([ self.partition_values, np.zeros((self.CUT_NUM // 2 + 1) ** 2, dtype=np.float32) ]) # 如果未完成 4 步,则仍处于切分阶段,不发奖励,done 为 False if self.partition_step < self.CUT_NUM: return state, 0.0, False, False, {} else: # 完成 4 步后,计算切分边界 # 过滤掉 0,并去重后排序 vert = sorted(set(v for v in self.partition_values[:len( self.partition_values) // 2] if v > 0)) horiz = sorted(set(v for v in self.partition_values[len( self.partition_values) // 2:] if v > 0)) vertical_cuts = vert if vert else [] horizontal_cuts = horiz if horiz else [] # 边界:始终包含 0 和 1 self.col_cuts = [0.0] + vertical_cuts + [1.0] self.row_cuts = [0.0] + horizontal_cuts + [1.0] # 判断分区是否合理,并计算各个分区的任务卸载率ρ valid_partition = True for i in range(len(self.row_cuts) - 1): for j in range(len(self.col_cuts) - 1): d = (self.col_cuts[j+1] - self.col_cuts[j]) * self.W * \ (self.row_cuts[i+1] - self.row_cuts[i]) * self.H rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ (self.comp_time_factor - self.trans_time_factor) rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \ (self.comp_energy_factor * d - self.trans_energy_factor * d) if rho_energy_limit < 0: valid_partition = False break rho = min(rho_time_limit, rho_energy_limit) flight_time = self.flight_time_factor * d bs_time = self.bs_time_factor * (1 - rho) * d self.rectangles[(i, j)] = { 'center': ((self.row_cuts[i] + self.row_cuts[i+1]) * self.H / 2, (self.col_cuts[j+1] + self.col_cuts[j]) * self.W / 2), 'flight_time': flight_time, 'bs_time': bs_time, 'is_visited': False } if not valid_partition: break if not valid_partition: reward = -10000 # 状态:前 4 维为 partition_values,其余为区域访问状态(初始全0) max_regions = (self.CUT_NUM // 2 + 1) ** 2 state = np.concatenate([ self.partition_values, np.zeros(max_regions, dtype=np.float32) ]) return state, reward, True, False, {} else: # 进入阶段 1:初始化迷宫 self.phase = 1 reward = 100 # 构建反向索引,方便后续计算 self.reverse_rectangles = {v['center']: k for k, v in self.rectangles.items()} region_centers = [ (i, j, self.rectangles[(i, j)]['center']) for i in range(len(self.row_cuts) - 1) for j in range(len(self.col_cuts) - 1) ] # 按照与区域中心的距离从近到远排序 region_centers.sort( key=lambda x: math.dist(x[2], (self.H / 2, self.W / 2)) ) # 分配最近的区域给每辆车 for idx in range(self.num_cars): i, j, center = region_centers[idx] self.car_pos[idx] = center self.car_traj[idx].append((i, j)) self.rectangles[(i, j)]['is_visited'] = True # 进入阶段 2:走迷宫 self.phase = 2 # 构造访问状态向量 max_regions = (self.CUT_NUM // 2 + 1) ** 2 visit_status = np.zeros(max_regions, dtype=np.float32) # 将实际区域的访问状态填入向量 for i in range(len(self.row_cuts) - 1): for j in range(len(self.col_cuts) - 1): idx = i * (len(self.col_cuts) - 1) + j visit_status[idx] = float(self.rectangles[(i, j)]['is_visited']) for i in range(idx + 1, max_regions): visit_status[i] = 100 state = np.concatenate([self.partition_values, visit_status]) return state, reward, False, False, {} elif self.phase == 2: # 阶段 2:路径规划(走迷宫) current_car = self.current_car_index # 查表,找出当前车辆所在的网格 current_row, current_col = self.reverse_rectangles[self.car_pos[current_car]] reward = 0 # 当前动作 a 为 1 维连续动作,映射到四个方向 if a < 0.2: move_dir = 'up' elif a < 0.4: move_dir = 'down' elif a < 0.6: move_dir = 'left' elif a < 0.8: move_dir = 'right' else: move_dir = 'stay' # 初始化新的行、列为当前值 new_row, new_col = current_row, current_col if move_dir == 'up': if current_row > 0: new_row = current_row - 1 else: # 错误的移动给一些惩罚? new_row = current_row # reward -= 10 elif move_dir == 'down': if current_row < len(self.row_cuts) - 2: new_row = current_row + 1 else: new_row = current_row # reward -= 10 elif move_dir == 'left': if current_col > 0: new_col = current_col - 1 else: new_col = current_col # reward -= 10 elif move_dir == 'right': if current_col < len(self.col_cuts) - 2: new_col = current_col + 1 else: new_col = current_col # reward -= 10 # 如果移动不合法,或者动作为stay,则保持原位置 # 更新车辆位置 self.car_pos[current_car] = self.rectangles[( new_row, new_col)]['center'] if new_row != current_row or new_col != current_col: self.car_traj[current_car].append((new_row, new_col)) self.step_count += 1 self.current_car_index = ( self.current_car_index + 1) % self.num_cars # 更新访问标记:将新网格标记为已访问 self.rectangles[(new_row, new_col)]['is_visited'] = True # 观察状态 # 构造访问状态向量 max_regions = (self.CUT_NUM // 2 + 1) ** 2 visit_status = np.zeros(max_regions, dtype=np.float32) # 将实际区域的访问状态填入向量 for i in range(len(self.row_cuts) - 1): for j in range(len(self.col_cuts) - 1): idx = i * (len(self.col_cuts) - 1) + j visit_status[idx] = float(self.rectangles[(i, j)]['is_visited']) for i in range(idx + 1, max_regions): visit_status[i] = 100 state = np.concatenate([self.partition_values, visit_status]) # Episode 终止条件:所有网格均被访问或步数达到上限 done = all([value['is_visited'] for _, value in self.rectangles.items()]) or ( self.step_count >= self.MAX_STEPS) if done and all([value['is_visited'] for _, value in self.rectangles.items()]): # 区域覆盖完毕,根据轨迹计算各车队的执行时间 T = max([self._compute_motorcade_time(idx) for idx in range(self.num_cars)]) # print(T) # print(self.partition_values) # print(self.car_traj) reward += self.BASE_LINE / T * 1000 elif done and self.step_count >= self.MAX_STEPS: reward += -1000 return state, reward, done, False, {} def _compute_motorcade_time(self, idx): flight_time = sum(self.rectangles[tuple(point)]['flight_time'] for point in self.car_traj[idx]) bs_time = sum(self.rectangles[tuple(point)]['bs_time'] for point in self.car_traj[idx]) # 计算车的移动时间,首先在轨迹的首尾添加上大区域中心 car_time = 0 for i in range(len(self.car_traj[idx]) - 1): first_point = self.car_traj[idx][i] second_point = self.car_traj[idx][i + 1] car_time += math.dist(self.rectangles[first_point]['center'], self.rectangles[second_point]['center']) * \ self.car_time_factor car_time += math.dist(self.rectangles[self.car_traj[idx][0]]['center'], [ self.H / 2, self.W / 2]) * self.car_time_factor car_time += math.dist(self.rectangles[self.car_traj[idx][-1]]['center'], [ self.H / 2, self.W / 2]) * self.car_time_factor return max(float(car_time) + flight_time, bs_time) def render(self): if self.phase == 1: print("Phase 1: Initialize maze environment.") print(f"Partition values so far: {self.partition_values}") print(f"Motorcade positon: {self.car_pos}") # input('1111') elif self.phase == 2: print("Phase 2: Play maze.") print(f'Motorcade trajectory: {self.car_traj}') # input('2222')