169 lines
6.8 KiB
Python
169 lines
6.8 KiB
Python
![]() |
import gymnasium as gym
|
|||
|
from gymnasium import spaces
|
|||
|
import numpy as np
|
|||
|
import yaml
|
|||
|
import math
|
|||
|
from mTSP_solver import mTSP
|
|||
|
|
|||
|
|
|||
|
class PartitionEnv(gym.Env):
|
|||
|
"""
|
|||
|
自定义环境,分为两阶段:
|
|||
|
区域切分,每一次切分都是(0, 1)之间的连续值
|
|||
|
"""
|
|||
|
|
|||
|
def __init__(self, config=None):
|
|||
|
super(PartitionEnv, self).__init__()
|
|||
|
##############################
|
|||
|
# 可能需要手动修改的超参数
|
|||
|
##############################
|
|||
|
self.params = 'params3'
|
|||
|
self.CUT_NUM = 2
|
|||
|
self.ROW_CUT_LIMIT = 1
|
|||
|
self.COL_CUT_LIMIT = 1
|
|||
|
self.BASE_LINE = 5000
|
|||
|
|
|||
|
# 车队参数设置
|
|||
|
with open(self.params + '.yml', 'r', encoding='utf-8') as file:
|
|||
|
params = yaml.safe_load(file)
|
|||
|
|
|||
|
self.H = params['H']
|
|||
|
self.W = params['W']
|
|||
|
self.center = (self.H/2, self.W/2)
|
|||
|
self.num_cars = params['num_cars']
|
|||
|
|
|||
|
self.flight_time_factor = params['flight_time_factor']
|
|||
|
self.comp_time_factor = params['comp_time_factor']
|
|||
|
self.trans_time_factor = params['trans_time_factor']
|
|||
|
self.car_time_factor = params['car_time_factor']
|
|||
|
self.bs_time_factor = params['bs_time_factor']
|
|||
|
|
|||
|
self.flight_energy_factor = params['flight_energy_factor']
|
|||
|
self.comp_energy_factor = params['comp_energy_factor']
|
|||
|
self.trans_energy_factor = params['trans_energy_factor']
|
|||
|
self.battery_energy_capacity = params['battery_energy_capacity']
|
|||
|
|
|||
|
self.partition_step = 0 # 区域划分阶段步数,范围 0~4
|
|||
|
self.partition_values = np.zeros(
|
|||
|
self.CUT_NUM, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂
|
|||
|
|
|||
|
# 定义动作空间:全部动作均为 1 维连续 [0,1]
|
|||
|
self.action_space = spaces.Box(
|
|||
|
low=0.0, high=1.0, shape=(1,), dtype=np.float32)
|
|||
|
|
|||
|
# 定义观察空间为8维向量
|
|||
|
# 前 4 维表示已决策的切分值(未决策部分为 0)
|
|||
|
self.observation_space = spaces.Box(
|
|||
|
low=0.0, high=1.0, shape=(self.CUT_NUM,), dtype=np.float32)
|
|||
|
|
|||
|
# 切分阶段相关变量
|
|||
|
self.col_cuts = [] # 存储竖切位置(c₁, c₂),当值为0时表示不切
|
|||
|
self.row_cuts = [] # 存储横切位置(r₁, r₂)
|
|||
|
self.rectangles = []
|
|||
|
|
|||
|
def reset(self, seed=None, options=None):
|
|||
|
# 重置所有变量,回到切分阶段(phase 0)
|
|||
|
self.phase = 0
|
|||
|
self.partition_step = 0
|
|||
|
self.partition_values = np.zeros(self.CUT_NUM, dtype=np.float32)
|
|||
|
self.col_cuts = []
|
|||
|
self.row_cuts = []
|
|||
|
self.rectangles = []
|
|||
|
|
|||
|
# 状态:前 4 维为 partition_values,其余为区域访问状态(初始全0)
|
|||
|
state = self.partition_values
|
|||
|
|
|||
|
return state
|
|||
|
|
|||
|
def step(self, action):
|
|||
|
# 在所有阶段动作均为 1 维连续动作,取 action[0]
|
|||
|
a = float(action[0])
|
|||
|
self.partition_values[self.partition_step] = a
|
|||
|
self.partition_step += 1
|
|||
|
|
|||
|
# 构造当前状态:前 partition_step 个为已决策值,其余为 0,再补 7 个 0
|
|||
|
state = self.partition_values
|
|||
|
|
|||
|
# 如果未完成 4 步,则仍处于切分阶段,不发奖励,done 为 False
|
|||
|
if self.partition_step < self.CUT_NUM:
|
|||
|
return state, 0.0, False, False, {}
|
|||
|
else:
|
|||
|
# 完成 4 步后,计算切分边界
|
|||
|
# 过滤掉 0,并去重后排序
|
|||
|
rows = sorted(
|
|||
|
set(v for v in self.partition_values[:self.ROW_CUT_LIMIT] if v > 0))
|
|||
|
cols = sorted(
|
|||
|
set(v for v in self.partition_values[self.ROW_CUT_LIMIT:] if v > 0))
|
|||
|
rows = rows if rows else []
|
|||
|
cols = rows if cols else []
|
|||
|
|
|||
|
# 边界:始终包含 0 和 1
|
|||
|
self.row_cuts = [0.0] + rows + [1.0]
|
|||
|
self.col_cuts = [0.0] + cols + [1.0]
|
|||
|
|
|||
|
# 判断分区是否合理,并计算各个分区的任务卸载率ρ
|
|||
|
valid_partition = True
|
|||
|
for i in range(len(self.row_cuts) - 1):
|
|||
|
for j in range(len(self.col_cuts) - 1):
|
|||
|
d = (self.col_cuts[j+1] - self.col_cuts[j]) * self.W * \
|
|||
|
(self.row_cuts[i+1] - self.row_cuts[i]) * self.H
|
|||
|
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
|
|||
|
(self.comp_time_factor - self.trans_time_factor)
|
|||
|
rho_energy_limit = (self.battery_energy_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
|
|||
|
(self.comp_energy_factor * d -
|
|||
|
self.trans_energy_factor * d)
|
|||
|
if rho_energy_limit < 0:
|
|||
|
valid_partition = False
|
|||
|
break
|
|||
|
rho = min(rho_time_limit, rho_energy_limit)
|
|||
|
|
|||
|
flight_time = self.flight_time_factor * d
|
|||
|
bs_time = self.bs_time_factor * (1 - rho) * d
|
|||
|
|
|||
|
self.rectangles.append({
|
|||
|
'center': ((self.row_cuts[i] + self.row_cuts[i+1]) * self.H / 2, (self.col_cuts[j+1] + self.col_cuts[j]) * self.W / 2),
|
|||
|
'flight_time': flight_time,
|
|||
|
'bs_time': bs_time,
|
|||
|
})
|
|||
|
if not valid_partition:
|
|||
|
break
|
|||
|
|
|||
|
if not valid_partition:
|
|||
|
reward = -100
|
|||
|
state = self.partition_values
|
|||
|
return state, reward, True, False, {}
|
|||
|
else:
|
|||
|
reward = 0
|
|||
|
state = self.partition_values
|
|||
|
|
|||
|
# 继续进行路径规划
|
|||
|
# cities: [[x1, x2, x3...], [y1, y2, y3...]] 城市坐标
|
|||
|
rec_center_lt = [rec_info['center']
|
|||
|
for rec_info in self.rectangles]
|
|||
|
cities = np.column_stack(rec_center_lt)
|
|||
|
cities = np.column_stack((self.center, cities))
|
|||
|
|
|||
|
center_idx = []
|
|||
|
for i in range(self.num_cars - 1):
|
|||
|
cities = np.column_stack((cities, self.center))
|
|||
|
center_idx.append(cities.shape[1] - 1)
|
|||
|
|
|||
|
tsp = mTSP(params=self.params, num_cities=cities.shape[1], cities=cities, num_cars=self.num_cars,
|
|||
|
center_idx=center_idx, rectangles=self.rectangles)
|
|||
|
best_time, best_path = tsp.train(10000)
|
|||
|
|
|||
|
reward += self.BASE_LINE - best_time
|
|||
|
|
|||
|
return state, reward, True, False, best_path
|
|||
|
|
|||
|
def render(self):
|
|||
|
if self.phase == 1:
|
|||
|
print("Phase 1: Initialize maze environment.")
|
|||
|
print(f"Partition values so far: {self.partition_values}")
|
|||
|
print(f"Motorcade positon: {self.car_pos}")
|
|||
|
# input('1111')
|
|||
|
elif self.phase == 2:
|
|||
|
print("Phase 2: Play maze.")
|
|||
|
print(f'Motorcade trajectory: {self.car_traj}')
|
|||
|
# input('2222')
|