From f19e8fbdbf2c9685406ee4bc65bbae6608597f82 Mon Sep 17 00:00:00 2001 From: weixin_46229132 Date: Tue, 18 Mar 2025 21:16:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5dqn=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DQN/RL_brain.py | 254 ----------------------------------- DQN/run_this.py | 58 -------- Duel_Double_DQN/DQN.py | 144 ++++++++++++++++++++ Duel_Double_DQN/main.py | 163 +++++++++++++++++++++++ Duel_Double_DQN/utils.py | 28 ++++ env.py | 4 +- env_dis.py | 278 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 615 insertions(+), 314 deletions(-) delete mode 100644 DQN/RL_brain.py delete mode 100644 DQN/run_this.py create mode 100644 Duel_Double_DQN/DQN.py create mode 100644 Duel_Double_DQN/main.py create mode 100644 Duel_Double_DQN/utils.py create mode 100644 env_dis.py diff --git a/DQN/RL_brain.py b/DQN/RL_brain.py deleted file mode 100644 index b1149b0..0000000 --- a/DQN/RL_brain.py +++ /dev/null @@ -1,254 +0,0 @@ -""" -Deep Q Network off-policy -""" -import torch -import torch.nn as nn -import numpy as np -import pandas as pd -import matplotlib.pyplot as plt - -np.random.seed(42) -torch.manual_seed(2) - - -class Network(nn.Module): - """ - Network Structure - """ - def __init__(self, - n_features, - n_actions, - n_neuron=10 - ): - super(Network, self).__init__() - self.net = nn.Sequential( - nn.Linear(in_features=n_features, out_features=n_neuron, bias=True), - nn.Linear(in_features=n_neuron, out_features=n_actions, bias=True), - nn.ReLU() - ) - - def forward(self, s): - """ - - :param s: s - :return: q - """ - q = self.net(s) - return q - - -class DeepQNetwork(nn.Module): - """ - Q Learning Algorithm - """ - def __init__(self, - n_actions, - n_features, - learning_rate=0.01, - reward_decay=0.9, - e_greedy=0.9, - replace_target_iter=300, - memory_size=500, - batch_size=32, - e_greedy_increment=None): - super(DeepQNetwork, self).__init__() - - self.n_actions = n_actions - self.n_features = n_features - self.lr = learning_rate - self.gamma = reward_decay - self.epsilon_max = e_greedy - self.replace_target_iter = replace_target_iter - self.memory_size = memory_size - self.batch_size = batch_size - self.epsilon_increment = e_greedy_increment - self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max - - # total learning step - self.learn_step_counter = 0 - - # initialize zero memory [s, a, r, s_] - # 这里用pd.DataFrame创建的表格作为memory - # 表格的行数是memory的大小,也就是transition的个数 - # 表格的列数是transition的长度,一个transition包含[s, a, r, s_],其中a和r分别是一个数字,s和s_的长度分别是n_features - self.memory = pd.DataFrame(np.zeros((self.memory_size, self.n_features*2+2))) - - # build two network: eval_net and target_net - self.eval_net = Network(n_features=self.n_features, n_actions=self.n_actions) - self.target_net = Network(n_features=self.n_features, n_actions=self.n_actions) - self.loss_function = nn.MSELoss() - self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=self.lr) - - # 记录每一步的误差 - self.cost_his = [] - - - def store_transition(self, s, a, r, s_): - if not hasattr(self, 'memory_counter'): - # hasattr用于判断对象是否包含对应的属性。 - self.memory_counter = 0 - - transition = np.hstack((s, [a,r], s_)) - - # replace the old memory with new memory - index = self.memory_counter % self.memory_size - self.memory.iloc[index, :] = transition - - self.memory_counter += 1 - - def choose_action(self, observation): - observation = observation[np.newaxis, :] - - if np.random.uniform() < self.epsilon: - # forward feed the observation and get q value for every actions - s = torch.FloatTensor(observation) - actions_value = self.eval_net(s) - action = [np.argmax(actions_value.detach().numpy())][0] - else: - action = np.random.randint(0, self.n_actions) - return action - - def _replace_target_params(self): - # 复制网络参数 - self.target_net.load_state_dict(self.eval_net.state_dict()) - - def learn(self): - # check to replace target parameters - if self.learn_step_counter % self.replace_target_iter == 0: - self._replace_target_params() - print('\ntarget params replaced\n') - - # sample batch memory from all memory - batch_memory = self.memory.sample(self.batch_size) \ - if self.memory_counter > self.memory_size \ - else self.memory.iloc[:self.memory_counter].sample(self.batch_size, replace=True) - - # run the nextwork - s = torch.FloatTensor(batch_memory.iloc[:, :self.n_features].values) - s_ = torch.FloatTensor(batch_memory.iloc[:, -self.n_features:].values) - q_eval = self.eval_net(s) - q_next = self.target_net(s_) - - # change q_target w.r.t q_eval's action - q_target = q_eval.clone() - - # 更新值 - batch_index = np.arange(self.batch_size, dtype=np.int32) - eval_act_index = batch_memory.iloc[:, self.n_features].values.astype(int) - reward = batch_memory.iloc[:, self.n_features + 1].values - - q_target[batch_index, eval_act_index] = torch.FloatTensor(reward) + self.gamma * q_next.max(dim=1).values - - # train eval network - loss = self.loss_function(q_target, q_eval) - self.optimizer.zero_grad() - loss.backward() - self.optimizer.step() - - self.cost_his.append(loss.detach().numpy()) - - # increasing epsilon - self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max - self.learn_step_counter += 1 - - def plot_cost(self): - plt.figure() - plt.plot(np.arange(len(self.cost_his)), self.cost_his) - plt.show() - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/DQN/run_this.py b/DQN/run_this.py deleted file mode 100644 index 8398068..0000000 --- a/DQN/run_this.py +++ /dev/null @@ -1,58 +0,0 @@ -from RL_brain import DeepQNetwork -import os -import sys -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from env import PartitionMazeEnv - -def run_maze(): - step = 0 # 为了记录走到第几步,记忆录中积累经验(也就是积累一些transition)之后再开始学习 - for episode in range(200): - # initial observation - observation = env.reset() - - while True: - # refresh env - env.render() - - # RL choose action based on observation - action = RL.choose_action(observation) - - # RL take action and get next observation and reward - observation_, reward, done = env.step(action) - - # !! restore transition - RL.store_transition(observation, action, reward, observation_) - - # 超过200条transition之后每隔5步学习一次 - if (step > 200) and (step % 5 == 0): - RL.learn() - - # swap observation - observation = observation_ - - # break while loop when end of this episode - if done: - break - step += 1 - - # end of game - print("game over") - env.destroy() - - -if __name__ == "__main__": - # maze game - env = PartitionMazeEnv() - - # TODO 代码还没有写完,跑不了!!! - RL = DeepQNetwork(env.n_actions, env.n_features, - learning_rate=0.01, - reward_decay=0.9, - e_greedy=0.9, - replace_target_iter=200, - memory_size=2000) - env.after(100, run_maze) - env.mainloop() - RL.plot_cost() - - diff --git a/Duel_Double_DQN/DQN.py b/Duel_Double_DQN/DQN.py new file mode 100644 index 0000000..044f284 --- /dev/null +++ b/Duel_Double_DQN/DQN.py @@ -0,0 +1,144 @@ +import torch.nn.functional as F +import torch.nn as nn +import numpy as np +import torch +import copy + + +def build_net(layer_shape, activation, output_activation): + '''Build networks with For loop''' + layers = [] + for j in range(len(layer_shape)-1): + act = activation if j < len(layer_shape)-2 else output_activation + layers += [nn.Linear(layer_shape[j], layer_shape[j+1]), act()] + return nn.Sequential(*layers) + +class Q_Net(nn.Module): + def __init__(self, state_dim, action_dim, hid_shape): + super(Q_Net, self).__init__() + layers = [state_dim] + list(hid_shape) + [action_dim] + self.Q = build_net(layers, nn.ReLU, nn.Identity) + def forward(self, s): + q = self.Q(s) + return q + + +class Duel_Q_Net(nn.Module): + def __init__(self, state_dim, action_dim, hid_shape): + super(Duel_Q_Net, self).__init__() + layers = [state_dim] + list(hid_shape) + self.hidden = build_net(layers, nn.ReLU, nn.ReLU) + self.V = nn.Linear(hid_shape[-1], 1) + self.A = nn.Linear(hid_shape[-1], action_dim) + + def forward(self, s): + s = self.hidden(s) + Adv = self.A(s) + V = self.V(s) + Q = V + (Adv - torch.mean(Adv, dim=-1, keepdim=True)) # Q(s,a)=V(s)+A(s,a)-mean(A(s,a)) + return Q + + +class DQN_agent(object): + def __init__(self, **kwargs): + # Init hyperparameters for agent, just like "self.gamma = opt.gamma, self.lambd = opt.lambd, ..." + self.__dict__.update(kwargs) + self.tau = 0.005 + self.replay_buffer = ReplayBuffer(self.state_dim, self.dvc, max_size=int(1e6)) + if self.Duel: + self.q_net = Duel_Q_Net(self.state_dim, self.action_dim, (self.net_width,self.net_width)).to(self.dvc) + else: + self.q_net = Q_Net(self.state_dim, self.action_dim, (self.net_width, self.net_width)).to(self.dvc) + self.q_net_optimizer = torch.optim.Adam(self.q_net.parameters(), lr=self.lr) + self.q_target = copy.deepcopy(self.q_net) + # Freeze target networks with respect to optimizers (only update via polyak averaging) + for p in self.q_target.parameters(): p.requires_grad = False + + + def select_action(self, state, deterministic):#only used when interact with the env + with torch.no_grad(): + state = torch.FloatTensor(state.reshape(1, -1)).to(self.dvc) + # if deterministic: + # a = self.q_net(state).argmax().item() + # else: + if np.random.rand() < self.exp_noise: + if state[0][0] == 0: + a = np.random.randint(0,10) + else: + a = np.random.randint(10,13) + else: + if state[0][0] == 0: + q_value = self.q_net(state) + q_value[:10] = - float('inf') + a = q_value.argmax().item() + else: + q_value = self.q_net(state) + q_value[10:] = - float('inf') + a = q_value.argmax().item() + return a + + + def train(self): + s, a, r, s_next, dw = self.replay_buffer.sample(self.batch_size) + + '''Compute the target Q value''' + with torch.no_grad(): + if self.Double: + argmax_a = self.q_net(s_next).argmax(dim=1).unsqueeze(-1) + max_q_next = self.q_target(s_next).gather(1,argmax_a) + else: + max_q_next = self.q_target(s_next).max(1)[0].unsqueeze(1) + target_Q = r + (~dw) * self.gamma * max_q_next #dw: die or win + + # Get current Q estimates + current_q = self.q_net(s) + current_q_a = current_q.gather(1,a) + + q_loss = F.mse_loss(current_q_a, target_Q) + self.q_net_optimizer.zero_grad() + q_loss.backward() + self.q_net_optimizer.step() + + # Update the frozen target models + for param, target_param in zip(self.q_net.parameters(), self.q_target.parameters()): + target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data) + + + def save(self,algo,EnvName,steps): + torch.save(self.q_net.state_dict(), "./weights/{}_{}_{}.pth".format(algo,EnvName,steps)) + + def load(self,algo,EnvName,steps): + self.q_net.load_state_dict(torch.load("./model/{}_{}_{}.pth".format(algo,EnvName,steps),map_location=self.dvc)) + self.q_target.load_state_dict(torch.load("./model/{}_{}_{}.pth".format(algo,EnvName,steps),map_location=self.dvc)) + + +class ReplayBuffer(object): + def __init__(self, state_dim, dvc, max_size=int(1e6)): + self.max_size = max_size + self.dvc = dvc + self.ptr = 0 + self.size = 0 + + self.s = torch.zeros((max_size, state_dim),dtype=torch.float,device=self.dvc) + self.a = torch.zeros((max_size, 1),dtype=torch.long,device=self.dvc) + self.r = torch.zeros((max_size, 1),dtype=torch.float,device=self.dvc) + self.s_next = torch.zeros((max_size, state_dim),dtype=torch.float,device=self.dvc) + self.dw = torch.zeros((max_size, 1),dtype=torch.bool,device=self.dvc) + + def add(self, s, a, r, s_next, dw): + self.s[self.ptr] = torch.from_numpy(s).to(self.dvc) + self.a[self.ptr] = a + self.r[self.ptr] = r + self.s_next[self.ptr] = torch.from_numpy(s_next).to(self.dvc) + self.dw[self.ptr] = dw + + self.ptr = (self.ptr + 1) % self.max_size + self.size = min(self.size + 1, self.max_size) + + def sample(self, batch_size): + ind = torch.randint(0, self.size, device=self.dvc, size=(batch_size,)) + return self.s[ind], self.a[ind], self.r[ind], self.s_next[ind], self.dw[ind] + + + + diff --git a/Duel_Double_DQN/main.py b/Duel_Double_DQN/main.py new file mode 100644 index 0000000..ce89e13 --- /dev/null +++ b/Duel_Double_DQN/main.py @@ -0,0 +1,163 @@ +import gymnasium as gym +import os +import shutil +import argparse +import torch +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from env_dis import PartitionMazeEnv +from utils import evaluate_policy, str2bool +from datetime import datetime +from DQN import DQN_agent + +'''Hyperparameter Setting''' +parser = argparse.ArgumentParser() +parser.add_argument('--dvc', type=str, default='cpu', + help='running device: cuda or cpu') +parser.add_argument('--EnvIdex', type=int, default=0, help='CP-v1, LLd-v2') +parser.add_argument('--write', type=str2bool, default=False, + help='Use SummaryWriter to record the training') +parser.add_argument('--render', type=str2bool, + default=False, help='Render or Not') +parser.add_argument('--Loadmodel', type=str2bool, + default=False, help='Load pretrained model or Not') +parser.add_argument('--ModelIdex', type=int, default=100, + help='which model to load') + +parser.add_argument('--seed', type=int, default=42, help='random seed') +parser.add_argument('--Max_train_steps', type=int, + default=int(1e8), help='Max training steps') +parser.add_argument('--save_interval', type=int, + default=int(50e3), help='Model saving interval, in steps.') +parser.add_argument('--eval_interval', type=int, default=int(2e3), + help='Model evaluating interval, in steps.') +parser.add_argument('--random_steps', type=int, default=int(3e3), + help='steps for random policy to explore') +parser.add_argument('--update_every', type=int, + default=50, help='training frequency') + +parser.add_argument('--gamma', type=float, default=0.99, + help='Discounted Factor') +parser.add_argument('--net_width', type=int, + default=200, help='Hidden net width') +parser.add_argument('--lr', type=float, default=1e-4, help='Learning rate') +parser.add_argument('--batch_size', type=int, default=256, + help='lenth of sliced trajectory') +parser.add_argument('--exp_noise', type=float, + default=0.2, help='explore noise') +parser.add_argument('--noise_decay', type=float, default=0.99, + help='decay rate of explore noise') +parser.add_argument('--Double', type=str2bool, default=True, + help='Whether to use Double Q-learning') +parser.add_argument('--Duel', type=str2bool, default=True, + help='Whether to use Duel networks') +opt = parser.parse_args() +opt.dvc = torch.device(opt.dvc) # from str to torch.device +print(opt) + + +def main(): + EnvName = ['CartPole-v1', 'LunarLander-v2'] + BriefEnvName = ['PM_DQN', 'CPV1', 'LLdV2'] + # env = gym.make(EnvName[opt.EnvIdex], render_mode = "human" if opt.render else None) + # eval_env = gym.make(EnvName[opt.EnvIdex]) + env = PartitionMazeEnv() + eval_env = PartitionMazeEnv() + opt.state_dim = env.observation_space.shape[0] + opt.action_dim = env.action_space.n + opt.max_e_steps = 50 + + # Algorithm Setting + if opt.Duel: + algo_name = 'Duel' + else: + algo_name = '' + if opt.Double: + algo_name += 'DDQN' + else: + algo_name += 'DQN' + + # Seed Everything + env_seed = opt.seed + torch.manual_seed(opt.seed) + torch.cuda.manual_seed(opt.seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + print("Random Seed: {}".format(opt.seed)) + + print('Algorithm:', algo_name, ' Env:', BriefEnvName[opt.EnvIdex], ' state_dim:', opt.state_dim, + ' action_dim:', opt.action_dim, ' Random Seed:', opt.seed, ' max_e_steps:', opt.max_e_steps, '\n') + + if opt.write: + from torch.utils.tensorboard import SummaryWriter + timenow = str(datetime.now())[0:-10] + timenow = ' ' + timenow[0:13] + '_' + timenow[-2::] + writepath = 'runs/{}-{}_S{}_'.format(algo_name, + BriefEnvName[opt.EnvIdex], opt.seed) + timenow + if os.path.exists(writepath): + shutil.rmtree(writepath) + writer = SummaryWriter(log_dir=writepath) + + # Build model and replay buffer + if not os.path.exists('model'): + os.mkdir('model') + agent = DQN_agent(**vars(opt)) + if opt.Loadmodel: + agent.load(algo_name, BriefEnvName[opt.EnvIdex], opt.ModelIdex) + + if opt.render: + while True: + score = evaluate_policy(env, agent, 1) + print('EnvName:', BriefEnvName[opt.EnvIdex], + 'seed:', opt.seed, 'score:', score) + else: + total_steps = 0 + while total_steps < opt.Max_train_steps: + # Do not use opt.seed directly, or it can overfit to opt.seed + s = env.reset(seed=env_seed) + env_seed += 1 + done = False + + '''Interact & trian''' + while not done: + # e-greedy exploration + if total_steps < opt.random_steps: + a = env.action_space.sample() + else: + a = agent.select_action(s, deterministic=False) + s_next, r, dw, tr, info = env.step(a) + done = (dw or tr) + + agent.replay_buffer.add(s, a, r, s_next, dw) + s = s_next + + '''Update''' + # train 50 times every 50 steps rather than 1 training per step. Better! + if total_steps >= opt.random_steps and total_steps % opt.update_every == 0: + for j in range(opt.update_every): + agent.train() + + '''Noise decay & Record & Log''' + if total_steps % 1000 == 0: + agent.exp_noise *= opt.noise_decay + if total_steps % opt.eval_interval == 0: + score = evaluate_policy(eval_env, agent, turns=3) + if opt.write: + writer.add_scalar( + 'ep_r', score, global_step=total_steps) + writer.add_scalar( + 'noise', agent.exp_noise, global_step=total_steps) + print('EnvName:', BriefEnvName[opt.EnvIdex], 'seed:', opt.seed, 'steps: {}k'.format( + int(total_steps/1000)), 'score:', int(score)) + total_steps += 1 + + '''save model''' + if total_steps % opt.save_interval == 0: + agent.save(algo_name, BriefEnvName[opt.EnvIdex], int( + total_steps/1000)) + env.close() + eval_env.close() + + +if __name__ == '__main__': + main() diff --git a/Duel_Double_DQN/utils.py b/Duel_Double_DQN/utils.py new file mode 100644 index 0000000..bd342ff --- /dev/null +++ b/Duel_Double_DQN/utils.py @@ -0,0 +1,28 @@ +def evaluate_policy(env, agent, turns = 3): + total_scores = 0 + for j in range(turns): + s = env.reset() + done = False + while not done: + # Take deterministic actions at test time + a = agent.select_action(s, deterministic=True) + s_next, r, dw, tr, info = env.step(a) + done = (dw or tr) + + total_scores += r + s = s_next + return int(total_scores/turns) + + +#You can just ignore this funciton. Is not related to the RL. +def str2bool(v): + '''transfer str to bool for argparse''' + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'True','true','TRUE', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'False','false','FALSE', 'f', 'n', '0'): + return False + else: + print('Wrong Input.') + raise \ No newline at end of file diff --git a/env.py b/env.py index 6c94765..12c7966 100644 --- a/env.py +++ b/env.py @@ -39,8 +39,8 @@ class PartitionMazeEnv(gym.Env): ############################## # 可能需要手动修改的超参数 ############################## - self.CUT_NUM = 2 # 横切一半,竖切一半 - self.BASE_LINE = 4000 # 基准时间,通过greedy或者蒙特卡洛计算出来 + self.CUT_NUM = 6 # 横切一半,竖切一半 + self.BASE_LINE = 12000 # 基准时间,通过greedy或者蒙特卡洛计算出来 self.phase = 0 # 阶段控制,0:区域划分阶段,1:迷宫初始化阶段,2:走迷宫阶段 self.partition_step = 0 # 区域划分阶段步数,范围 0~4 diff --git a/env_dis.py b/env_dis.py new file mode 100644 index 0000000..f7d55f5 --- /dev/null +++ b/env_dis.py @@ -0,0 +1,278 @@ +import gymnasium as gym +from gymnasium import spaces +import numpy as np +import yaml +import math + + +class PartitionMazeEnv(gym.Env): + """ + 自定义环境,分为两阶段: + 阶段 0:区域切分(共 4 步,每一步输出一个标量,用于确定竖切和横切位置)。 + 切分顺序为:第一步输出 c₁,第二步输出 c₂,第三步输出 r₁,第四步输出 r₂。 + 离散化后取值仅为 {0, 0.1, 0.2, …, 0.9}(其中 0 表示不切)。 + 阶段 1:车辆路径规划(走迷宫),车辆从区域中心出发,在九宫格内按照上下左右移动, + 直到所有目标格子被覆盖或步数上限达到。 + """ + + def __init__(self, config=None): + super(PartitionMazeEnv, self).__init__() + # 车队参数设置 + with open('params.yml', 'r', encoding='utf-8') as file: + params = yaml.safe_load(file) + + self.H = params['H'] + self.W = params['W'] + self.num_cars = params['num_cars'] + + self.flight_time_factor = params['flight_time_factor'] + self.comp_time_factor = params['comp_time_factor'] + self.trans_time_factor = params['trans_time_factor'] + self.car_time_factor = params['car_time_factor'] + self.bs_time_factor = params['bs_time_factor'] + + self.flight_energy_factor = params['flight_energy_factor'] + self.comp_energy_factor = params['comp_energy_factor'] + self.trans_energy_factor = params['trans_energy_factor'] + self.battery_energy_capacity = params['battery_energy_capacity'] + + ############################## + # 可能需要手动修改的超参数 + ############################## + self.CUT_NUM = 4 # 横切一半,竖切一半 + self.BASE_LINE = 4000 # 基准时间,通过greedy或者蒙特卡洛计算出来 + + self.phase = 0 # 阶段控制,0:区域划分阶段,1:迷宫初始化阶段,2:走迷宫阶段 + self.partition_step = 0 # 区域划分阶段步数,范围 0~4 + self.partition_values = np.zeros( + self.CUT_NUM, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂ + + # 定义动作空间:长度为 14 的离散动作空间 + # 前 10 个表示切分动作 {0, 0.1, ..., 0.9},后 4 个表示上下左右移动 + self.action_space = spaces.Discrete(14) + + # 定义观察空间为8维向量 + # TODO 返回的状态目前只有位置坐标 + # 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0) + # 阶段 1 状态:车辆位置 (2D) + self.observation_space = spaces.Box( + low=0.0, high=1.0, shape=(1 + self.CUT_NUM + 2 * self.num_cars,), dtype=np.float32) + + # 切分阶段相关变量 + self.col_cuts = [] # 存储竖切位置(c₁, c₂),当值为0时表示不切 + self.row_cuts = [] # 存储横切位置(r₁, r₂) + + self.init_maze_step = 0 + + # 路径规划阶段相关变量 + self.MAX_STEPS = 50 # 迷宫走法步数上限 + self.step_count = 0 + self.rectangles = {} + self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)] + self.car_traj = [[] for _ in range(self.num_cars)] + self.current_car_index = 0 + + def reset(self, seed=None, options=None): + # 重置所有变量,回到切分阶段(phase 0) + self.phase = 0 + self.partition_step = 0 + self.partition_values = np.zeros(self.CUT_NUM, dtype=np.float32) + self.col_cuts = [] + self.row_cuts = [] + self.init_maze_step = 0 + self.region_centers = [] + self.step_count = 0 + self.rectangles = {} + self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)] + self.car_traj = [[] for _ in range(self.num_cars)] + self.current_car_index = 0 + # 状态:前 4 维为 partition_values,其余补 0 + state = np.concatenate( + [[self.phase], self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)]) + return state + + def step(self, action): + # 在所有阶段动作均为离散动作 + if self.phase == 0: + # 切分阶段:前 10 个动作对应 {0, 0.1, ..., 0.9} + disc_val = action * 0.1 # 修正为动作直接映射到切分比例 + self.partition_values[self.partition_step] = disc_val + self.partition_step += 1 + + # 构造当前状态:前 partition_step 个为已决策值,其余为 0,再补 7 个 0 + state = np.concatenate( + [[self.phase], self.partition_values, np.zeros( + np.array(self.car_pos).flatten().shape[0], dtype=np.float32)] + ) + + # 如果未完成 4 步,则仍处于切分阶段,不发奖励,done 为 False + if self.partition_step < self.CUT_NUM: + return state, 0.0, False, False, {} + else: + # 完成 4 步后,计算切分边界 + # 过滤掉 0,并去重后排序 + vert = sorted(set(v for v in self.partition_values[:len( + self.partition_values) // 2] if v > 0)) + horiz = sorted(set(v for v in self.partition_values[len( + self.partition_values) // 2:] if v > 0)) + vertical_cuts = vert if vert else [] + horizontal_cuts = horiz if horiz else [] + + # 边界:始终包含 0 和 1 + self.col_cuts = [0.0] + vertical_cuts + [1.0] + self.row_cuts = [0.0] + horizontal_cuts + [1.0] + + # 判断分区是否合理,并计算各个分区的任务卸载率ρ + valid_partition = True + for i in range(len(self.row_cuts) - 1): + for j in range(len(self.col_cuts) - 1): + d = (self.col_cuts[j+1] - self.col_cuts[j]) * self.W * \ + (self.row_cuts[i+1] - self.row_cuts[i]) * self.H + rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \ + (self.comp_time_factor - self.trans_time_factor) + rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \ + (self.comp_energy_factor * d - + self.trans_energy_factor * d) + if rho_energy_limit < 0: + valid_partition = False + break + rho = min(rho_time_limit, rho_energy_limit) + + flight_time = self.flight_time_factor * d + bs_time = self.bs_time_factor * (1 - rho) * d + + self.rectangles[(i, j)] = { + 'center': ((self.row_cuts[i] + self.row_cuts[i+1]) * self.H / 2, (self.col_cuts[j+1] + self.col_cuts[j]) * self.W / 2), + 'flight_time': flight_time, + 'bs_time': bs_time, + 'is_visited': False + } + if not valid_partition: + break + + if not valid_partition: + reward = -10000 + state = np.concatenate( + [[self.phase], self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)]) + return state, reward, True, False, {} + else: + # 进入阶段 1:初始化迷宫 + self.phase = 1 + state = np.concatenate( + [[self.phase], self.partition_values, np.array(self.car_pos).flatten()]) + reward = 10 + + # 构建反向索引,方便后续计算 + self.reverse_rectangles = { + v['center']: k for k, v in self.rectangles.items()} + return state, reward, False, False, {} + + elif self.phase == 1: + # TODO 阶段一可以不写出来!!! + # 阶段 1:初始化迷宫,让多个车辆从区域中心出发,前往最近的几个区域中心点 + region_centers = [ + (i, j, self.rectangles[(i, j)]['center']) + for i in range(len(self.row_cuts) - 1) + for j in range(len(self.col_cuts) - 1) + ] + # 按照与区域中心的距离从近到远排序 + region_centers.sort( + key=lambda x: math.dist(x[2], (self.H / 2, self.W / 2)) + ) + + # 分配最近的区域给每辆车 + for idx in range(self.num_cars): + i, j, center = region_centers[idx] + self.car_pos[idx] = center + self.car_traj[idx].append((i, j)) + self.rectangles[(i, j)]['is_visited'] = True + + # 进入阶段 2:走迷宫 + self.phase = 2 + state = np.concatenate( + [[self.phase], self.partition_values, + np.array(self.car_pos).flatten()] + ) + return state, 0.0, False, False, {} + + elif self.phase == 2: + # 阶段 2:路径规划(走迷宫) + # 后 4 个动作对应上下左右移动 + current_car = self.current_car_index + current_row, current_col = self.reverse_rectangles[self.car_pos[current_car]] + + # 初始化新的行、列为当前值 + new_row, new_col = current_row, current_col + + if action == 10 and current_row > 0: # 上 + new_row = current_row - 1 + elif action == 11 and current_row < len(self.row_cuts) - 2: # 下 + new_row = current_row + 1 + elif action == 12 and current_col > 0: # 左 + new_col = current_col - 1 + elif action == 13 and current_col < len(self.col_cuts) - 2: # 右 + new_col = new_col + 1 + + # 更新车辆位置 + self.car_pos[current_car] = self.rectangles[( + new_row, new_col)]['center'] + if new_row != current_row or new_col != current_col: + self.car_traj[current_car].append((new_row, new_col)) + self.step_count += 1 + self.current_car_index = ( + self.current_car_index + 1) % self.num_cars + + # 更新访问标记:将新网格标记为已访问 + self.rectangles[(new_row, new_col)]['is_visited'] = True + + # 观察状态 + state = np.concatenate( + [[self.phase], self.partition_values, np.array(self.car_pos).flatten()]) + reward = 0 + + # Episode 终止条件:所有网格均被访问或步数达到上限 + done = all([value['is_visited'] for _, value in self.rectangles.items()]) or ( + self.step_count >= self.MAX_STEPS) + if done and all([value['is_visited'] for _, value in self.rectangles.items()]): + # 区域覆盖完毕,根据轨迹计算各车队的执行时间 + T = max([self._compute_motorcade_time(idx) + for idx in range(self.num_cars)]) + # print(T) + # print(self.partition_values) + # print(self.car_traj) + reward += self.BASE_LINE / T * 100 + elif done and self.step_count >= self.MAX_STEPS: + reward += -1000 + + return state, reward, done, False, {} + + def _compute_motorcade_time(self, idx): + flight_time = sum(self.rectangles[tuple(point)]['flight_time'] + for point in self.car_traj[idx]) + bs_time = sum(self.rectangles[tuple(point)]['bs_time'] + for point in self.car_traj[idx]) + + # 计算车的移动时间,首先在轨迹的首尾添加上大区域中心 + car_time = 0 + for i in range(len(self.car_traj[idx]) - 1): + first_point = self.car_traj[idx][i] + second_point = self.car_traj[idx][i + 1] + car_time += math.dist(self.rectangles[first_point]['center'], self.rectangles[second_point]['center']) * \ + self.car_time_factor + car_time += math.dist(self.rectangles[self.car_traj[idx][0]]['center'], [ + self.H / 2, self.W / 2]) * self.car_time_factor + car_time += math.dist(self.rectangles[self.car_traj[idx][-1]]['center'], [ + self.H / 2, self.W / 2]) * self.car_time_factor + + return max(float(car_time) + flight_time, bs_time) + + def render(self): + if self.phase == 1: + print("Phase 1: Initialize maze environment.") + print(f"Partition values so far: {self.partition_values}") + print(f"Motorcade positon: {self.car_pos}") + # input('1111') + elif self.phase == 2: + print("Phase 2: Play maze.") + print(f'Motorcade trajectory: {self.car_traj}') + # input('2222')