加入dqn算法

This commit is contained in:
weixin_46229132 2025-03-18 21:16:48 +08:00
parent 343008bc9f
commit f19e8fbdbf
7 changed files with 615 additions and 314 deletions

View File

@ -1,254 +0,0 @@
"""
Deep Q Network off-policy
"""
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
np.random.seed(42)
torch.manual_seed(2)
class Network(nn.Module):
"""
Network Structure
"""
def __init__(self,
n_features,
n_actions,
n_neuron=10
):
super(Network, self).__init__()
self.net = nn.Sequential(
nn.Linear(in_features=n_features, out_features=n_neuron, bias=True),
nn.Linear(in_features=n_neuron, out_features=n_actions, bias=True),
nn.ReLU()
)
def forward(self, s):
"""
:param s: s
:return: q
"""
q = self.net(s)
return q
class DeepQNetwork(nn.Module):
"""
Q Learning Algorithm
"""
def __init__(self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.9,
e_greedy=0.9,
replace_target_iter=300,
memory_size=500,
batch_size=32,
e_greedy_increment=None):
super(DeepQNetwork, self).__init__()
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy
self.replace_target_iter = replace_target_iter
self.memory_size = memory_size
self.batch_size = batch_size
self.epsilon_increment = e_greedy_increment
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max
# total learning step
self.learn_step_counter = 0
# initialize zero memory [s, a, r, s_]
# 这里用pd.DataFrame创建的表格作为memory
# 表格的行数是memory的大小也就是transition的个数
# 表格的列数是transition的长度一个transition包含[s, a, r, s_]其中a和r分别是一个数字s和s_的长度分别是n_features
self.memory = pd.DataFrame(np.zeros((self.memory_size, self.n_features*2+2)))
# build two network: eval_net and target_net
self.eval_net = Network(n_features=self.n_features, n_actions=self.n_actions)
self.target_net = Network(n_features=self.n_features, n_actions=self.n_actions)
self.loss_function = nn.MSELoss()
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=self.lr)
# 记录每一步的误差
self.cost_his = []
def store_transition(self, s, a, r, s_):
if not hasattr(self, 'memory_counter'):
# hasattr用于判断对象是否包含对应的属性。
self.memory_counter = 0
transition = np.hstack((s, [a,r], s_))
# replace the old memory with new memory
index = self.memory_counter % self.memory_size
self.memory.iloc[index, :] = transition
self.memory_counter += 1
def choose_action(self, observation):
observation = observation[np.newaxis, :]
if np.random.uniform() < self.epsilon:
# forward feed the observation and get q value for every actions
s = torch.FloatTensor(observation)
actions_value = self.eval_net(s)
action = [np.argmax(actions_value.detach().numpy())][0]
else:
action = np.random.randint(0, self.n_actions)
return action
def _replace_target_params(self):
# 复制网络参数
self.target_net.load_state_dict(self.eval_net.state_dict())
def learn(self):
# check to replace target parameters
if self.learn_step_counter % self.replace_target_iter == 0:
self._replace_target_params()
print('\ntarget params replaced\n')
# sample batch memory from all memory
batch_memory = self.memory.sample(self.batch_size) \
if self.memory_counter > self.memory_size \
else self.memory.iloc[:self.memory_counter].sample(self.batch_size, replace=True)
# run the nextwork
s = torch.FloatTensor(batch_memory.iloc[:, :self.n_features].values)
s_ = torch.FloatTensor(batch_memory.iloc[:, -self.n_features:].values)
q_eval = self.eval_net(s)
q_next = self.target_net(s_)
# change q_target w.r.t q_eval's action
q_target = q_eval.clone()
# 更新值
batch_index = np.arange(self.batch_size, dtype=np.int32)
eval_act_index = batch_memory.iloc[:, self.n_features].values.astype(int)
reward = batch_memory.iloc[:, self.n_features + 1].values
q_target[batch_index, eval_act_index] = torch.FloatTensor(reward) + self.gamma * q_next.max(dim=1).values
# train eval network
loss = self.loss_function(q_target, q_eval)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.cost_his.append(loss.detach().numpy())
# increasing epsilon
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1
def plot_cost(self):
plt.figure()
plt.plot(np.arange(len(self.cost_his)), self.cost_his)
plt.show()

View File

@ -1,58 +0,0 @@
from RL_brain import DeepQNetwork
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from env import PartitionMazeEnv
def run_maze():
step = 0 # 为了记录走到第几步记忆录中积累经验也就是积累一些transition之后再开始学习
for episode in range(200):
# initial observation
observation = env.reset()
while True:
# refresh env
env.render()
# RL choose action based on observation
action = RL.choose_action(observation)
# RL take action and get next observation and reward
observation_, reward, done = env.step(action)
# !! restore transition
RL.store_transition(observation, action, reward, observation_)
# 超过200条transition之后每隔5步学习一次
if (step > 200) and (step % 5 == 0):
RL.learn()
# swap observation
observation = observation_
# break while loop when end of this episode
if done:
break
step += 1
# end of game
print("game over")
env.destroy()
if __name__ == "__main__":
# maze game
env = PartitionMazeEnv()
# TODO 代码还没有写完,跑不了!!!
RL = DeepQNetwork(env.n_actions, env.n_features,
learning_rate=0.01,
reward_decay=0.9,
e_greedy=0.9,
replace_target_iter=200,
memory_size=2000)
env.after(100, run_maze)
env.mainloop()
RL.plot_cost()

144
Duel_Double_DQN/DQN.py Normal file
View File

@ -0,0 +1,144 @@
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import torch
import copy
def build_net(layer_shape, activation, output_activation):
'''Build networks with For loop'''
layers = []
for j in range(len(layer_shape)-1):
act = activation if j < len(layer_shape)-2 else output_activation
layers += [nn.Linear(layer_shape[j], layer_shape[j+1]), act()]
return nn.Sequential(*layers)
class Q_Net(nn.Module):
def __init__(self, state_dim, action_dim, hid_shape):
super(Q_Net, self).__init__()
layers = [state_dim] + list(hid_shape) + [action_dim]
self.Q = build_net(layers, nn.ReLU, nn.Identity)
def forward(self, s):
q = self.Q(s)
return q
class Duel_Q_Net(nn.Module):
def __init__(self, state_dim, action_dim, hid_shape):
super(Duel_Q_Net, self).__init__()
layers = [state_dim] + list(hid_shape)
self.hidden = build_net(layers, nn.ReLU, nn.ReLU)
self.V = nn.Linear(hid_shape[-1], 1)
self.A = nn.Linear(hid_shape[-1], action_dim)
def forward(self, s):
s = self.hidden(s)
Adv = self.A(s)
V = self.V(s)
Q = V + (Adv - torch.mean(Adv, dim=-1, keepdim=True)) # Q(s,a)=V(s)+A(s,a)-mean(A(s,a))
return Q
class DQN_agent(object):
def __init__(self, **kwargs):
# Init hyperparameters for agent, just like "self.gamma = opt.gamma, self.lambd = opt.lambd, ..."
self.__dict__.update(kwargs)
self.tau = 0.005
self.replay_buffer = ReplayBuffer(self.state_dim, self.dvc, max_size=int(1e6))
if self.Duel:
self.q_net = Duel_Q_Net(self.state_dim, self.action_dim, (self.net_width,self.net_width)).to(self.dvc)
else:
self.q_net = Q_Net(self.state_dim, self.action_dim, (self.net_width, self.net_width)).to(self.dvc)
self.q_net_optimizer = torch.optim.Adam(self.q_net.parameters(), lr=self.lr)
self.q_target = copy.deepcopy(self.q_net)
# Freeze target networks with respect to optimizers (only update via polyak averaging)
for p in self.q_target.parameters(): p.requires_grad = False
def select_action(self, state, deterministic):#only used when interact with the env
with torch.no_grad():
state = torch.FloatTensor(state.reshape(1, -1)).to(self.dvc)
# if deterministic:
# a = self.q_net(state).argmax().item()
# else:
if np.random.rand() < self.exp_noise:
if state[0][0] == 0:
a = np.random.randint(0,10)
else:
a = np.random.randint(10,13)
else:
if state[0][0] == 0:
q_value = self.q_net(state)
q_value[:10] = - float('inf')
a = q_value.argmax().item()
else:
q_value = self.q_net(state)
q_value[10:] = - float('inf')
a = q_value.argmax().item()
return a
def train(self):
s, a, r, s_next, dw = self.replay_buffer.sample(self.batch_size)
'''Compute the target Q value'''
with torch.no_grad():
if self.Double:
argmax_a = self.q_net(s_next).argmax(dim=1).unsqueeze(-1)
max_q_next = self.q_target(s_next).gather(1,argmax_a)
else:
max_q_next = self.q_target(s_next).max(1)[0].unsqueeze(1)
target_Q = r + (~dw) * self.gamma * max_q_next #dw: die or win
# Get current Q estimates
current_q = self.q_net(s)
current_q_a = current_q.gather(1,a)
q_loss = F.mse_loss(current_q_a, target_Q)
self.q_net_optimizer.zero_grad()
q_loss.backward()
self.q_net_optimizer.step()
# Update the frozen target models
for param, target_param in zip(self.q_net.parameters(), self.q_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
def save(self,algo,EnvName,steps):
torch.save(self.q_net.state_dict(), "./weights/{}_{}_{}.pth".format(algo,EnvName,steps))
def load(self,algo,EnvName,steps):
self.q_net.load_state_dict(torch.load("./model/{}_{}_{}.pth".format(algo,EnvName,steps),map_location=self.dvc))
self.q_target.load_state_dict(torch.load("./model/{}_{}_{}.pth".format(algo,EnvName,steps),map_location=self.dvc))
class ReplayBuffer(object):
def __init__(self, state_dim, dvc, max_size=int(1e6)):
self.max_size = max_size
self.dvc = dvc
self.ptr = 0
self.size = 0
self.s = torch.zeros((max_size, state_dim),dtype=torch.float,device=self.dvc)
self.a = torch.zeros((max_size, 1),dtype=torch.long,device=self.dvc)
self.r = torch.zeros((max_size, 1),dtype=torch.float,device=self.dvc)
self.s_next = torch.zeros((max_size, state_dim),dtype=torch.float,device=self.dvc)
self.dw = torch.zeros((max_size, 1),dtype=torch.bool,device=self.dvc)
def add(self, s, a, r, s_next, dw):
self.s[self.ptr] = torch.from_numpy(s).to(self.dvc)
self.a[self.ptr] = a
self.r[self.ptr] = r
self.s_next[self.ptr] = torch.from_numpy(s_next).to(self.dvc)
self.dw[self.ptr] = dw
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample(self, batch_size):
ind = torch.randint(0, self.size, device=self.dvc, size=(batch_size,))
return self.s[ind], self.a[ind], self.r[ind], self.s_next[ind], self.dw[ind]

163
Duel_Double_DQN/main.py Normal file
View File

@ -0,0 +1,163 @@
import gymnasium as gym
import os
import shutil
import argparse
import torch
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from env_dis import PartitionMazeEnv
from utils import evaluate_policy, str2bool
from datetime import datetime
from DQN import DQN_agent
'''Hyperparameter Setting'''
parser = argparse.ArgumentParser()
parser.add_argument('--dvc', type=str, default='cpu',
help='running device: cuda or cpu')
parser.add_argument('--EnvIdex', type=int, default=0, help='CP-v1, LLd-v2')
parser.add_argument('--write', type=str2bool, default=False,
help='Use SummaryWriter to record the training')
parser.add_argument('--render', type=str2bool,
default=False, help='Render or Not')
parser.add_argument('--Loadmodel', type=str2bool,
default=False, help='Load pretrained model or Not')
parser.add_argument('--ModelIdex', type=int, default=100,
help='which model to load')
parser.add_argument('--seed', type=int, default=42, help='random seed')
parser.add_argument('--Max_train_steps', type=int,
default=int(1e8), help='Max training steps')
parser.add_argument('--save_interval', type=int,
default=int(50e3), help='Model saving interval, in steps.')
parser.add_argument('--eval_interval', type=int, default=int(2e3),
help='Model evaluating interval, in steps.')
parser.add_argument('--random_steps', type=int, default=int(3e3),
help='steps for random policy to explore')
parser.add_argument('--update_every', type=int,
default=50, help='training frequency')
parser.add_argument('--gamma', type=float, default=0.99,
help='Discounted Factor')
parser.add_argument('--net_width', type=int,
default=200, help='Hidden net width')
parser.add_argument('--lr', type=float, default=1e-4, help='Learning rate')
parser.add_argument('--batch_size', type=int, default=256,
help='lenth of sliced trajectory')
parser.add_argument('--exp_noise', type=float,
default=0.2, help='explore noise')
parser.add_argument('--noise_decay', type=float, default=0.99,
help='decay rate of explore noise')
parser.add_argument('--Double', type=str2bool, default=True,
help='Whether to use Double Q-learning')
parser.add_argument('--Duel', type=str2bool, default=True,
help='Whether to use Duel networks')
opt = parser.parse_args()
opt.dvc = torch.device(opt.dvc) # from str to torch.device
print(opt)
def main():
EnvName = ['CartPole-v1', 'LunarLander-v2']
BriefEnvName = ['PM_DQN', 'CPV1', 'LLdV2']
# env = gym.make(EnvName[opt.EnvIdex], render_mode = "human" if opt.render else None)
# eval_env = gym.make(EnvName[opt.EnvIdex])
env = PartitionMazeEnv()
eval_env = PartitionMazeEnv()
opt.state_dim = env.observation_space.shape[0]
opt.action_dim = env.action_space.n
opt.max_e_steps = 50
# Algorithm Setting
if opt.Duel:
algo_name = 'Duel'
else:
algo_name = ''
if opt.Double:
algo_name += 'DDQN'
else:
algo_name += 'DQN'
# Seed Everything
env_seed = opt.seed
torch.manual_seed(opt.seed)
torch.cuda.manual_seed(opt.seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
print("Random Seed: {}".format(opt.seed))
print('Algorithm:', algo_name, ' Env:', BriefEnvName[opt.EnvIdex], ' state_dim:', opt.state_dim,
' action_dim:', opt.action_dim, ' Random Seed:', opt.seed, ' max_e_steps:', opt.max_e_steps, '\n')
if opt.write:
from torch.utils.tensorboard import SummaryWriter
timenow = str(datetime.now())[0:-10]
timenow = ' ' + timenow[0:13] + '_' + timenow[-2::]
writepath = 'runs/{}-{}_S{}_'.format(algo_name,
BriefEnvName[opt.EnvIdex], opt.seed) + timenow
if os.path.exists(writepath):
shutil.rmtree(writepath)
writer = SummaryWriter(log_dir=writepath)
# Build model and replay buffer
if not os.path.exists('model'):
os.mkdir('model')
agent = DQN_agent(**vars(opt))
if opt.Loadmodel:
agent.load(algo_name, BriefEnvName[opt.EnvIdex], opt.ModelIdex)
if opt.render:
while True:
score = evaluate_policy(env, agent, 1)
print('EnvName:', BriefEnvName[opt.EnvIdex],
'seed:', opt.seed, 'score:', score)
else:
total_steps = 0
while total_steps < opt.Max_train_steps:
# Do not use opt.seed directly, or it can overfit to opt.seed
s = env.reset(seed=env_seed)
env_seed += 1
done = False
'''Interact & trian'''
while not done:
# e-greedy exploration
if total_steps < opt.random_steps:
a = env.action_space.sample()
else:
a = agent.select_action(s, deterministic=False)
s_next, r, dw, tr, info = env.step(a)
done = (dw or tr)
agent.replay_buffer.add(s, a, r, s_next, dw)
s = s_next
'''Update'''
# train 50 times every 50 steps rather than 1 training per step. Better!
if total_steps >= opt.random_steps and total_steps % opt.update_every == 0:
for j in range(opt.update_every):
agent.train()
'''Noise decay & Record & Log'''
if total_steps % 1000 == 0:
agent.exp_noise *= opt.noise_decay
if total_steps % opt.eval_interval == 0:
score = evaluate_policy(eval_env, agent, turns=3)
if opt.write:
writer.add_scalar(
'ep_r', score, global_step=total_steps)
writer.add_scalar(
'noise', agent.exp_noise, global_step=total_steps)
print('EnvName:', BriefEnvName[opt.EnvIdex], 'seed:', opt.seed, 'steps: {}k'.format(
int(total_steps/1000)), 'score:', int(score))
total_steps += 1
'''save model'''
if total_steps % opt.save_interval == 0:
agent.save(algo_name, BriefEnvName[opt.EnvIdex], int(
total_steps/1000))
env.close()
eval_env.close()
if __name__ == '__main__':
main()

28
Duel_Double_DQN/utils.py Normal file
View File

@ -0,0 +1,28 @@
def evaluate_policy(env, agent, turns = 3):
total_scores = 0
for j in range(turns):
s = env.reset()
done = False
while not done:
# Take deterministic actions at test time
a = agent.select_action(s, deterministic=True)
s_next, r, dw, tr, info = env.step(a)
done = (dw or tr)
total_scores += r
s = s_next
return int(total_scores/turns)
#You can just ignore this funciton. Is not related to the RL.
def str2bool(v):
'''transfer str to bool for argparse'''
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'True','true','TRUE', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'False','false','FALSE', 'f', 'n', '0'):
return False
else:
print('Wrong Input.')
raise

4
env.py
View File

@ -39,8 +39,8 @@ class PartitionMazeEnv(gym.Env):
##############################
# 可能需要手动修改的超参数
##############################
self.CUT_NUM = 2 # 横切一半,竖切一半
self.BASE_LINE = 4000 # 基准时间通过greedy或者蒙特卡洛计算出来
self.CUT_NUM = 6 # 横切一半,竖切一半
self.BASE_LINE = 12000 # 基准时间通过greedy或者蒙特卡洛计算出来
self.phase = 0 # 阶段控制0区域划分阶段1迷宫初始化阶段2走迷宫阶段
self.partition_step = 0 # 区域划分阶段步数,范围 0~4

278
env_dis.py Normal file
View File

@ -0,0 +1,278 @@
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import yaml
import math
class PartitionMazeEnv(gym.Env):
"""
自定义环境分为两阶段
阶段 0区域切分 4 每一步输出一个标量用于确定竖切和横切位置
切分顺序为第一步输出 c₁第二步输出 c₂第三步输出 r₁第四步输出 r₂
离散化后取值仅为 {0, 0.1, 0.2, , 0.9}其中 0 表示不切
阶段 1车辆路径规划走迷宫车辆从区域中心出发在九宫格内按照上下左右移动
直到所有目标格子被覆盖或步数上限达到
"""
def __init__(self, config=None):
super(PartitionMazeEnv, self).__init__()
# 车队参数设置
with open('params.yml', 'r', encoding='utf-8') as file:
params = yaml.safe_load(file)
self.H = params['H']
self.W = params['W']
self.num_cars = params['num_cars']
self.flight_time_factor = params['flight_time_factor']
self.comp_time_factor = params['comp_time_factor']
self.trans_time_factor = params['trans_time_factor']
self.car_time_factor = params['car_time_factor']
self.bs_time_factor = params['bs_time_factor']
self.flight_energy_factor = params['flight_energy_factor']
self.comp_energy_factor = params['comp_energy_factor']
self.trans_energy_factor = params['trans_energy_factor']
self.battery_energy_capacity = params['battery_energy_capacity']
##############################
# 可能需要手动修改的超参数
##############################
self.CUT_NUM = 4 # 横切一半,竖切一半
self.BASE_LINE = 4000 # 基准时间通过greedy或者蒙特卡洛计算出来
self.phase = 0 # 阶段控制0区域划分阶段1迷宫初始化阶段2走迷宫阶段
self.partition_step = 0 # 区域划分阶段步数,范围 0~4
self.partition_values = np.zeros(
self.CUT_NUM, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂
# 定义动作空间:长度为 14 的离散动作空间
# 前 10 个表示切分动作 {0, 0.1, ..., 0.9},后 4 个表示上下左右移动
self.action_space = spaces.Discrete(14)
# 定义观察空间为8维向量
# TODO 返回的状态目前只有位置坐标
# 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0
# 阶段 1 状态:车辆位置 (2D)
self.observation_space = spaces.Box(
low=0.0, high=1.0, shape=(1 + self.CUT_NUM + 2 * self.num_cars,), dtype=np.float32)
# 切分阶段相关变量
self.col_cuts = [] # 存储竖切位置c₁, c₂当值为0时表示不切
self.row_cuts = [] # 存储横切位置r₁, r₂
self.init_maze_step = 0
# 路径规划阶段相关变量
self.MAX_STEPS = 50 # 迷宫走法步数上限
self.step_count = 0
self.rectangles = {}
self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)]
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
def reset(self, seed=None, options=None):
# 重置所有变量回到切分阶段phase 0
self.phase = 0
self.partition_step = 0
self.partition_values = np.zeros(self.CUT_NUM, dtype=np.float32)
self.col_cuts = []
self.row_cuts = []
self.init_maze_step = 0
self.region_centers = []
self.step_count = 0
self.rectangles = {}
self.car_pos = [(self.H / 2, self.W / 2) for _ in range(self.num_cars)]
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
# 状态:前 4 维为 partition_values其余补 0
state = np.concatenate(
[[self.phase], self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state
def step(self, action):
# 在所有阶段动作均为离散动作
if self.phase == 0:
# 切分阶段:前 10 个动作对应 {0, 0.1, ..., 0.9}
disc_val = action * 0.1 # 修正为动作直接映射到切分比例
self.partition_values[self.partition_step] = disc_val
self.partition_step += 1
# 构造当前状态:前 partition_step 个为已决策值,其余为 0再补 7 个 0
state = np.concatenate(
[[self.phase], self.partition_values, np.zeros(
np.array(self.car_pos).flatten().shape[0], dtype=np.float32)]
)
# 如果未完成 4 步则仍处于切分阶段不发奖励done 为 False
if self.partition_step < self.CUT_NUM:
return state, 0.0, False, False, {}
else:
# 完成 4 步后,计算切分边界
# 过滤掉 0并去重后排序
vert = sorted(set(v for v in self.partition_values[:len(
self.partition_values) // 2] if v > 0))
horiz = sorted(set(v for v in self.partition_values[len(
self.partition_values) // 2:] if v > 0))
vertical_cuts = vert if vert else []
horizontal_cuts = horiz if horiz else []
# 边界:始终包含 0 和 1
self.col_cuts = [0.0] + vertical_cuts + [1.0]
self.row_cuts = [0.0] + horizontal_cuts + [1.0]
# 判断分区是否合理,并计算各个分区的任务卸载率ρ
valid_partition = True
for i in range(len(self.row_cuts) - 1):
for j in range(len(self.col_cuts) - 1):
d = (self.col_cuts[j+1] - self.col_cuts[j]) * self.W * \
(self.row_cuts[i+1] - self.row_cuts[i]) * self.H
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
(self.comp_time_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
(self.comp_energy_factor * d -
self.trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
flight_time = self.flight_time_factor * d
bs_time = self.bs_time_factor * (1 - rho) * d
self.rectangles[(i, j)] = {
'center': ((self.row_cuts[i] + self.row_cuts[i+1]) * self.H / 2, (self.col_cuts[j+1] + self.col_cuts[j]) * self.W / 2),
'flight_time': flight_time,
'bs_time': bs_time,
'is_visited': False
}
if not valid_partition:
break
if not valid_partition:
reward = -10000
state = np.concatenate(
[[self.phase], self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state, reward, True, False, {}
else:
# 进入阶段 1初始化迷宫
self.phase = 1
state = np.concatenate(
[[self.phase], self.partition_values, np.array(self.car_pos).flatten()])
reward = 10
# 构建反向索引,方便后续计算
self.reverse_rectangles = {
v['center']: k for k, v in self.rectangles.items()}
return state, reward, False, False, {}
elif self.phase == 1:
# TODO 阶段一可以不写出来!!!
# 阶段 1初始化迷宫让多个车辆从区域中心出发前往最近的几个区域中心点
region_centers = [
(i, j, self.rectangles[(i, j)]['center'])
for i in range(len(self.row_cuts) - 1)
for j in range(len(self.col_cuts) - 1)
]
# 按照与区域中心的距离从近到远排序
region_centers.sort(
key=lambda x: math.dist(x[2], (self.H / 2, self.W / 2))
)
# 分配最近的区域给每辆车
for idx in range(self.num_cars):
i, j, center = region_centers[idx]
self.car_pos[idx] = center
self.car_traj[idx].append((i, j))
self.rectangles[(i, j)]['is_visited'] = True
# 进入阶段 2走迷宫
self.phase = 2
state = np.concatenate(
[[self.phase], self.partition_values,
np.array(self.car_pos).flatten()]
)
return state, 0.0, False, False, {}
elif self.phase == 2:
# 阶段 2路径规划走迷宫
# 后 4 个动作对应上下左右移动
current_car = self.current_car_index
current_row, current_col = self.reverse_rectangles[self.car_pos[current_car]]
# 初始化新的行、列为当前值
new_row, new_col = current_row, current_col
if action == 10 and current_row > 0: # 上
new_row = current_row - 1
elif action == 11 and current_row < len(self.row_cuts) - 2: # 下
new_row = current_row + 1
elif action == 12 and current_col > 0: # 左
new_col = current_col - 1
elif action == 13 and current_col < len(self.col_cuts) - 2: # 右
new_col = new_col + 1
# 更新车辆位置
self.car_pos[current_car] = self.rectangles[(
new_row, new_col)]['center']
if new_row != current_row or new_col != current_col:
self.car_traj[current_car].append((new_row, new_col))
self.step_count += 1
self.current_car_index = (
self.current_car_index + 1) % self.num_cars
# 更新访问标记:将新网格标记为已访问
self.rectangles[(new_row, new_col)]['is_visited'] = True
# 观察状态
state = np.concatenate(
[[self.phase], self.partition_values, np.array(self.car_pos).flatten()])
reward = 0
# Episode 终止条件:所有网格均被访问或步数达到上限
done = all([value['is_visited'] for _, value in self.rectangles.items()]) or (
self.step_count >= self.MAX_STEPS)
if done and all([value['is_visited'] for _, value in self.rectangles.items()]):
# 区域覆盖完毕,根据轨迹计算各车队的执行时间
T = max([self._compute_motorcade_time(idx)
for idx in range(self.num_cars)])
# print(T)
# print(self.partition_values)
# print(self.car_traj)
reward += self.BASE_LINE / T * 100
elif done and self.step_count >= self.MAX_STEPS:
reward += -1000
return state, reward, done, False, {}
def _compute_motorcade_time(self, idx):
flight_time = sum(self.rectangles[tuple(point)]['flight_time']
for point in self.car_traj[idx])
bs_time = sum(self.rectangles[tuple(point)]['bs_time']
for point in self.car_traj[idx])
# 计算车的移动时间,首先在轨迹的首尾添加上大区域中心
car_time = 0
for i in range(len(self.car_traj[idx]) - 1):
first_point = self.car_traj[idx][i]
second_point = self.car_traj[idx][i + 1]
car_time += math.dist(self.rectangles[first_point]['center'], self.rectangles[second_point]['center']) * \
self.car_time_factor
car_time += math.dist(self.rectangles[self.car_traj[idx][0]]['center'], [
self.H / 2, self.W / 2]) * self.car_time_factor
car_time += math.dist(self.rectangles[self.car_traj[idx][-1]]['center'], [
self.H / 2, self.W / 2]) * self.car_time_factor
return max(float(car_time) + flight_time, bs_time)
def render(self):
if self.phase == 1:
print("Phase 1: Initialize maze environment.")
print(f"Partition values so far: {self.partition_values}")
print(f"Motorcade positon: {self.car_pos}")
# input('1111')
elif self.phase == 2:
print("Phase 2: Play maze.")
print(f'Motorcade trajectory: {self.car_traj}')
# input('2222')