135 lines
4.8 KiB
Python
135 lines
4.8 KiB
Python
import numpy as np
|
|
import gym
|
|
from gym import spaces
|
|
|
|
|
|
class Env(gym.Env):
|
|
"""多车-巢-机系统的区域覆盖环境"""
|
|
|
|
def __init__(self):
|
|
super(Env, self).__init__()
|
|
|
|
# 环境参数
|
|
self.H = 20 # 区域高度
|
|
self.W = 25 # 区域宽度
|
|
self.k = 1 # 系统数量
|
|
|
|
# 时间系数
|
|
self.flight_time_factor = 3 # 每张照片飞行时间
|
|
self.comp_uav_factor = 5 # 无人机计算时间
|
|
self.trans_time_factor = 0.3 # 传输时间
|
|
self.car_move_time_factor = 100 # 汽车移动时间
|
|
self.comp_bs_factor = 5 # 机巢计算时间
|
|
|
|
# 能量参数
|
|
self.flight_energy_factor = 0.05 # 飞行能耗
|
|
self.comp_energy_factor = 0.05 # 计算能耗
|
|
self.trans_energy_factor = 0.0025 # 传输能耗
|
|
self.battery_capacity = 30 # 电池容量
|
|
|
|
# 动作空间
|
|
# [垂直切割数, 水平切割数, 卸载率]
|
|
self.action_space = spaces.Box(
|
|
low=np.array([1, 1, 0]),
|
|
high=np.array([5, 5, 1]),
|
|
dtype=np.float32
|
|
)
|
|
|
|
# 状态空间
|
|
# [当前垂直切割数, 当前水平切割数, 当前最大完成时间]
|
|
self.observation_space = spaces.Box(
|
|
low=np.array([1, 1, 0]),
|
|
high=np.array([5, 5, float('inf')]),
|
|
dtype=np.float32
|
|
)
|
|
|
|
self.state = None
|
|
self.current_step = 0
|
|
self.max_steps = 1000
|
|
|
|
def step(self, action):
|
|
self.current_step += 1
|
|
|
|
# 解析动作
|
|
v_cuts = int(action[0]) # 垂直切割数
|
|
h_cuts = int(action[1]) # 水平切割数
|
|
# rho = action[2] # 卸载率
|
|
|
|
# TODO 生成切割位置,目前是均匀切割
|
|
v_boundaries = np.linspace(0, self.H, v_cuts + 1)
|
|
h_boundaries = np.linspace(0, self.W, h_cuts + 1)
|
|
|
|
# 计算每个子区域的指标
|
|
total_time = 0
|
|
valid_partition = True
|
|
|
|
for i in range(len(v_boundaries) - 1):
|
|
for j in range(len(h_boundaries) - 1):
|
|
# 计算子区域大小
|
|
height = v_boundaries[i+1] - v_boundaries[i]
|
|
width = h_boundaries[j+1] - h_boundaries[j]
|
|
area = height * width
|
|
|
|
# 求解rho
|
|
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
|
|
(self.comp_uav_factor - self.trans_time_factor)
|
|
rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * area - self.trans_energy_factor * area) / \
|
|
(self.comp_energy_factor * area - self.trans_energy_factor * area)
|
|
if rho_energy_limit < 0:
|
|
valid_partition = False
|
|
break
|
|
rho = min(rho_time_limit, rho_energy_limit)
|
|
|
|
# 计算各阶段时间
|
|
flight_time = self.flight_time_factor * area
|
|
comp_time = self.comp_uav_factor * rho * area
|
|
trans_time = self.trans_time_factor * (1 - rho) * area
|
|
comp_bs_time = self.comp_bs_factor * (1 - rho) * area
|
|
|
|
# # 计算能耗
|
|
# flight_energy = self.flight_energy_factor * area
|
|
# comp_energy = self.comp_energy_factor * rho * area
|
|
# trans_energy = self.trans_energy_factor * (1 - rho) * area
|
|
# total_energy = flight_energy + comp_energy + trans_energy
|
|
|
|
# # 检查约束
|
|
# if total_energy > self.battery_capacity or (comp_time + trans_time > flight_time):
|
|
# valid_partition = False
|
|
# break
|
|
|
|
# 计算子区域中心到区域中心的距离
|
|
center_y = (v_boundaries[i] + v_boundaries[i+1]) / 2
|
|
center_x = (h_boundaries[j] + h_boundaries[j+1]) / 2
|
|
dist_to_center = np.sqrt(
|
|
(center_y - self.H/2)**2 + (center_x - self.W/2)**2)
|
|
car_time = dist_to_center * self.car_move_time_factor
|
|
|
|
# 更新总时间
|
|
task_time = max(flight_time + car_time, comp_bs_time)
|
|
total_time = max(total_time, task_time)
|
|
|
|
if not valid_partition:
|
|
break
|
|
|
|
# 计算奖励
|
|
if not valid_partition:
|
|
reward = -10000 # 惩罚无效方案
|
|
done = True
|
|
else:
|
|
reward = -total_time # 负的完成时间作为奖励
|
|
done = self.current_step >= self.max_steps
|
|
|
|
# 更新状态
|
|
self.state = np.array([v_cuts, h_cuts, total_time])
|
|
|
|
return self.state, reward, done, {}
|
|
|
|
def reset(self):
|
|
# 初始化状态
|
|
self.state = np.array([1, 1, 0])
|
|
self.current_step = 0
|
|
return self.state
|
|
|
|
def render(self, mode='human'):
|
|
pass
|