import numpy as np import gym from gym import spaces class AllocationEnv(gym.Env): """任务分配环境(第二层)""" def __init__(self, subareas, num_systems): super(AllocationEnv, self).__init__() self.subareas = subareas # 子区域列表 self.num_systems = num_systems # 系统数量 # 时间系数 self.flight_time_factor = 3 # 每张照片飞行时间 self.comp_uav_factor = 5 # 无人机计算时间 self.trans_time_factor = 0.3 # 传输时间 self.car_move_time_factor = 100 # 汽车移动时间 self.comp_bs_factor = 5 # 机巢计算时间 # 能量参数 self.flight_energy_factor = 0.05 # 飞行能耗 self.comp_energy_factor = 0.05 # 计算能耗 self.trans_energy_factor = 0.0025 # 传输能耗 self.battery_capacity = 30 # 电池容量 # 动作空间:每个子区域分配给哪个系统 self.action_space = spaces.MultiDiscrete([num_systems] * len(subareas)) # 状态空间:[各系统当前负载] self.observation_space = spaces.Box( low=np.zeros(num_systems), high=np.ones(num_systems) * float('inf'), dtype=np.float32 ) self.state = None self.current_step = 0 self.max_steps = 1000 def calculate_rho(self, area): """计算最优卸载率""" rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ (self.comp_uav_factor - self.trans_time_factor) rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * area - self.trans_energy_factor * area) / \ (self.comp_energy_factor * area - self.trans_energy_factor * area) if rho_energy_limit < 0: return None return min(rho_time_limit, rho_energy_limit) def step(self, action): self.current_step += 1 # 初始化每个系统的任务列表 system_tasks = {i: [] for i in range(self.num_systems)} # 根据动作分配任务 for i, system_id in enumerate(action): system_tasks[system_id].append(self.subareas[i]) # 计算每个系统的完成时间 system_times = [] valid_allocation = True for system_id, tasks in system_tasks.items(): if not tasks: # 如果系统没有分配任务 system_times.append(0) continue # 调用第三层(路径规划)获取结果 from env_routing import RoutingEnv route_env = RoutingEnv(tasks) completion_time, valid = route_env.optimize() if not valid: valid_allocation = False break system_times.append(completion_time) total_time = max(system_times) if system_times else 0 # 计算奖励 if not valid_allocation: reward = -10000 done = True else: reward = -total_time done = self.current_step >= self.max_steps # 更新状态(各系统的负载) self.state = np.array([len(tasks) for tasks in system_tasks.values()]) return self.state, reward, done, {} def reset(self): self.state = np.zeros(self.num_systems) self.current_step = 0 return self.state def render(self, mode='human'): pass def optimize(self): """使用DQN优化任务分配""" from dqn import Agent state_dim = self.observation_space.shape[0] action_dim = self.num_systems * len(self.subareas) agent = Agent(state_dim, action_dim) # 训练参数 episodes = 100 # 减少训练轮数,因为这是子问题 max_steps = 100 best_reward = float('-inf') best_time = float('inf') valid_solution = False for episode in range(episodes): state = self.reset() episode_reward = 0 for step in range(max_steps): action = agent.choose_action(state) next_state, reward, done, _ = self.step(action) agent.store_transition(state, action, reward, next_state, done) agent.learn() episode_reward += reward state = next_state if done: if reward != -10000: # 如果是有效解 valid_solution = True best_time = min(best_time, -reward) break return best_time, valid_solution