添加PPO代码

This commit is contained in:
weixin_46229132 2025-03-11 16:01:07 +08:00
parent e7a4395340
commit 1058f37be6
7 changed files with 1003 additions and 0 deletions

3
.gitignore vendored
View File

@ -7,6 +7,9 @@ __pycache__/
# C extensions
*.so
# Pytorch weights
weights/
# Distribution / packaging
.Python
build/

27
PPO/arguments.py Normal file
View File

@ -0,0 +1,27 @@
"""
This file contains the arguments to parse at command line.
File main.py will call get_args, which then the arguments
will be returned.
"""
import argparse
def get_args():
"""
Description:
Parses arguments at command line.
Parameters:
None
Return:
args - the arguments parsed
"""
parser = argparse.ArgumentParser()
parser.add_argument('--mode', dest='mode', type=str, default='train') # can be 'train' or 'test'
parser.add_argument('--actor_model', dest='actor_model', type=str, default='') # your actor model filename
parser.add_argument('--critic_model', dest='critic_model', type=str, default='') # your critic model filename
args = parser.parse_args()
return args

295
PPO/env.py Normal file
View File

@ -0,0 +1,295 @@
import gymnasium as gym
from gymnasium import spaces
import numpy as np
class PartitionMazeEnv(gym.Env):
"""
自定义环境分为两阶段
阶段 0区域切分 4 每一步输出一个标量用于确定竖切和横切位置
切分顺序为第一步输出 c₁第二步输出 c₂第三步输出 r₁第四步输出 r₂
离散化后取值仅为 {0, 0.1, 0.2, , 0.9}其中 0 表示不切
阶段 1车辆路径规划走迷宫车辆从区域中心出发在九宫格内按照上下左右移动
直到所有目标格子被覆盖或步数上限达到
"""
def __init__(self, config=None):
super(PartitionMazeEnv, self).__init__()
# 车队参数设置
self.H = 20 # 区域高度网格点之间的距离为25m单位距离
self.W = 30 # 区域宽度
self.num_cars = 2 # 系统数量(车-巢-机系统个数)
# 时间系数(单位:秒,每个网格一张照片)
self.flight_time_factor = 3 # 每张照片对应的飞行时间无人机飞行速度为9.5m/s拍摄照片的时间间隔为3s
self.comp_uav_factor = 5 # 无人机上每张照片计算时间5s
self.trans_time_factor = 0.3 # 每张照片传输时间0.3s
self.car_move_time_factor = 2 * 50 # TODO 汽车每单位距离的移动时间2s加了一个放大因子
self.comp_bs_factor = 5 # 机巢上每张照片计算时间
# 能耗参数
self.flight_energy_factor = 0.05 # 单位:分钟/张
self.comp_energy_factor = 0.05 # 计算能耗需要重新估计
self.trans_energy_factor = 0.0025
self.battery_capacity = 10 # 无人机只进行飞行续航为30分钟
self.phase = 0 # 阶段控制0区域划分阶段1迷宫初始化阶段2走迷宫阶段
self.partition_step = 0 # 区域划分阶段步数,范围 0~4
# TODO 切的刀数现在固定为42+2
self.partition_values = np.zeros(
4, dtype=np.float32) # 存储 c₁, c₂, r₁, r₂
# 定义动作空间:全部动作均为 1 维连续 [0,1]
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(1,), dtype=np.float32)
# 定义观察空间为8维向量
# TODO 返回的状态目前只有位置坐标
# 阶段 0 状态:前 4 维表示已决策的切分值(未决策部分为 0
# 阶段 1 状态:车辆位置 (2D)
self.observation_space = spaces.Box(
low=0.0, high=1.0, shape=(8,), dtype=np.float32)
# 切分阶段相关变量
self.vertical_cuts = [] # 存储竖切位置c₁, c₂当值为0时表示不切
self.horizontal_cuts = [] # 存储横切位置r₁, r₂
# TODO region_centers可不可以优化一下减少一些参数
self.region_centers = [] # 存储切分后每个子区域的中心点(归一化坐标)
# 路径规划阶段相关变量
self.MAX_STEPS = 50 # 迷宫走法步数上限
self.step_count = 0
self.rectangles = {}
self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)]
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
def reset(self, seed=None, options=None):
# 重置所有变量回到切分阶段phase 0
self.phase = 0
self.partition_step = 0
self.partition_values = np.zeros(4, dtype=np.float32)
self.vertical_cuts = []
self.horizontal_cuts = []
self.region_centers = []
self.step_count = 0
self.rectangles = {}
self.car_pos = [[0.5, 0.5] for _ in range(self.num_cars)]
self.car_traj = [[] for _ in range(self.num_cars)]
self.current_car_index = 0
# 状态:前 4 维为 partition_values其余补 0
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state, {}
def step(self, action):
# 在所有阶段动作均为 1 维连续动作,取 action[0]
a = float(action[0])
if self.phase == 0:
# 切分阶段:每一步输出一个标量,离散化为 {0, 0.1, ..., 0.9}
disc_val = np.floor(a * 10) / 10.0
disc_val = np.clip(disc_val, 0.0, 0.9)
self.partition_values[self.partition_step] = disc_val
self.partition_step += 1
# 构造当前状态:前 partition_step 个为已决策值,其余为 0再补 7 个 0
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
# 如果未完成 4 步则仍处于切分阶段不发奖励done 为 False
if self.partition_step < 4:
return state, 0.0, False, False, {}
else:
# 完成 4 步后,计算切分边界
# 过滤掉 0并去重后排序
vert = sorted(set(v for v in self.partition_values[:len(
self.partition_values) // 2] if v > 0))
horiz = sorted(set(v for v in self.partition_values[len(
self.partition_values) // 2:] if v > 0))
self.vertical_cuts = vert if vert else []
self.horizontal_cuts = horiz if horiz else []
# 边界:始终包含 0 和 1
v_boundaries = [0.0] + self.vertical_cuts + [1.0]
h_boundaries = [0.0] + self.horizontal_cuts + [1.0]
# 判断分区是否合理,并计算各个分区的任务卸载率ρ
valid_partition = True
for i in range(len(h_boundaries) - 1):
for j in range(len(v_boundaries) - 1):
d = (v_boundaries[j+1] - v_boundaries[j]) * self.W * \
(h_boundaries[i] + h_boundaries[i+1]) * self.H
rho_time_limit = (self.flight_time_factor - self.trans_time_factor) / \
(self.comp_uav_factor - self.trans_time_factor)
rho_energy_limit = (self.battery_capacity - self.flight_energy_factor * d - self.trans_energy_factor * d) / \
(self.comp_energy_factor * d -
self.trans_energy_factor * d)
if rho_energy_limit < 0:
valid_partition = False
break
rho = min(rho_time_limit, rho_energy_limit)
flight_time = self.flight_time_factor * d
comp_time = self.comp_uav_factor * rho * d
trans_time = self.trans_time_factor * (1 - rho) * d
comp_bs_time = self.comp_bs_factor * (1 - rho) * d
self.rectangles[(i, j)] = {
# 'r1': h_boundaries[i], 'r2': h_boundaries[i+1], 'c1': v_boundaries[j], 'c2': v_boundaries[j+1],
'd': d,
'rho': rho,
'flight_time': flight_time,
'comp_time': comp_time,
'trans_time': trans_time,
'comp_bs_time': comp_bs_time,
'is_visited': False
# 'center': (center_r, center_c)
}
if not valid_partition:
break
if not valid_partition:
reward = -100
state = np.concatenate(
[self.partition_values, np.zeros(np.array(self.car_pos).flatten().shape[0], dtype=np.float32)])
return state, reward, True, False, {}
else:
reward = 10
# 进入阶段 1走迷宫
self.phase = 1
# 根据分割边界计算每个子区域中心
self.region_centers = []
for i in range(len(h_boundaries) - 1):
for j in range(len(v_boundaries) - 1):
center_x = (
v_boundaries[j] + v_boundaries[j+1]) / 2.0
center_y = (
h_boundaries[i] + h_boundaries[i+1]) / 2.0
self.region_centers.append((center_x, center_y))
# 存储切分边界,供后续网格映射使用
self.v_boundaries = v_boundaries
self.h_boundaries = h_boundaries
# 初始化迷宫阶段:步数清零,建立 visited_grid 大小与网格数相同
self.step_count = 0
self.visited_grid = np.zeros(
(len(v_boundaries) - 1) * (len(h_boundaries) - 1), dtype=np.int32)
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
return state, reward, False, False, {}
elif self.phase == 1:
# 阶段 1初始化迷宫让多个车辆从区域中心出发前往划分区域的中心点
# 确保 action 的值在 [0, 1],然后映射到 0~(num_regions-1) 的索引
num_regions = len(self.region_centers)
target_region_index = int(np.floor(a * num_regions))
target_region_index = np.clip(
target_region_index, 0, num_regions - 1)
# 遍历所有车辆,让它们依次移动到目标子区域
for car_idx in range(self.num_cars):
target_position = np.array(
self.region_centers[target_region_index]) # 目标区域中心
# 更新该车辆位置
self.car_pos[car_idx] = target_position
# 累计步数
self.step_count += 1
self.car_traj[car_idx].append(target_position) # 记录每辆车的轨迹
# 进入阶段 2走迷宫
self.phase = 2
# 观察状态
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
return state, 0.0, False, False, {}
elif self.phase == 2:
# 阶段 2路径规划走迷宫
current_car = self.current_car_index
# 当前动作 a 为 1 维连续动作,映射到四个方向
if a < 0.2:
move_dir = 'up'
elif a < 0.4:
move_dir = 'down'
elif a < 0.6:
move_dir = 'left'
elif a < 0.8:
move_dir = 'right'
else:
move_dir = 'stay'
current_row, current_col = self.car_pos[current_car]
# 初始化新的行、列为当前值
new_row, new_col = current_row, current_col
if move_dir == 'up' and current_row < len(h_boundaries) - 1:
new_row = current_row + 1
elif move_dir == 'down' and current_row > 0:
new_row = current_row - 1
elif move_dir == 'left' and current_col > 0:
new_col = current_col - 1
elif move_dir == 'right' and current_col < len(v_boundaries) - 1:
new_col = current_col + 1
# 如果移动不合法或者动作为stay则保持原位置
# TODO 移动不合法,加一些惩罚
# 更新车辆位置
self.car_pos[current_car] = [new_row, new_col]
if new_row != current_row or new_col != current_col:
self.car_traj[current_car].append(np.array(new_row, new_col))
self.step_count += 1
self.current_car_index = (
self.current_car_index + 1) % self.num_cars
# 更新访问标记:将新网格标记为已访问
self.rectangles[(new_col, new_col)]['is_visited'] = True
# 观察状态
state = np.concatenate(
[self.partition_values, np.array(self.car_pos).flatten()])
# Episode 终止条件:所有网格均被访问或步数达到上限
done = all([rec['is_visited'] for rec in self.rectangles]) or (
self.step_count >= self.MAX_STEPS)
if done and np.all(self.visited_grid == 1):
# 区域覆盖完毕,根据轨迹计算各车队的执行时间
T = max([self._compute_motorcade_time(idx)
for idx in range(self.num_cars)])
reward += 10.0 # TODO 奖励与greedy比较
elif done and self.step_count >= self.MAX_STEPS:
reward -= 100
return state, reward, done, False, {}
def _compute_motorcade_time(self, idx):
flight_time = sum(self.rectangles[point]['flight_time']
for point in self.car_traj[idx])
bs_time = sum(self.rectangles[point]['comp_bs_time']
for point in self.car_traj[idx])
# 计算车的移动时间,首先在轨迹的首尾添加上大区域中心
self.car_traj[idx].append([0.5, 0.5])
self.car_traj[idx].insert(0, [0.5, 0.5])
for i in range(len(self.car_traj[idx])):
first_point = self.car_traj[idx][i]
second_point = self.car_traj[idx][i + 1]
car_time += np.linalg.norm(first_point, second_point) * \
self.H * self.W * self.car_move_time_factor
return max(car_time + flight_time, bs_time)
def render(self):
if self.phase == 0:
print("Phase 0: Partitioning.")
print(f"Partition step: {self.partition_step}")
print(f"Partition values so far: {self.partition_values}")
elif self.phase == 1:
print("Phase 1: Path planning (maze).")
print(f"Visited grid: {self.visited_grid}")
print(f"Step count: {self.step_count}")

103
PPO/eval_policy.py Normal file
View File

@ -0,0 +1,103 @@
"""
This file is used only to evaluate our trained policy/actor after
training in main.py with ppo.py. I wrote this file to demonstrate
that our trained policy exists independently of our learning algorithm,
which resides in ppo.py. Thus, we can test our trained policy without
relying on ppo.py.
"""
def _log_summary(ep_len, ep_ret, ep_num):
"""
Print to stdout what we've logged so far in the most recent episode.
Parameters:
None
Return:
None
"""
# Round decimal places for more aesthetic logging messages
ep_len = str(round(ep_len, 2))
ep_ret = str(round(ep_ret, 2))
# Print logging statements
print(flush=True)
print(f"-------------------- Episode #{ep_num} --------------------", flush=True)
print(f"Episodic Length: {ep_len}", flush=True)
print(f"Episodic Return: {ep_ret}", flush=True)
print(f"------------------------------------------------------", flush=True)
print(flush=True)
def rollout(policy, env, render):
"""
Returns a generator to roll out each episode given a trained policy and
environment to test on.
Parameters:
policy - The trained policy to test
env - The environment to evaluate the policy on
render - Specifies whether to render or not
Return:
A generator object rollout, or iterable, which will return the latest
episodic length and return on each iteration of the generator.
Note:
If you're unfamiliar with Python generators, check this out:
https://wiki.python.org/moin/Generators
If you're unfamiliar with Python "yield", check this out:
https://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do
"""
# Rollout until user kills process
while True:
obs, _ = env.reset()
done = False
# number of timesteps so far
t = 0
# Logging data
ep_len = 0 # episodic length
ep_ret = 0 # episodic return
while not done:
t += 1
# Render environment if specified, off by default
if render:
env.render()
# Query deterministic action from policy and run it
action = policy(obs).detach().numpy()
obs, rew, terminated, truncated, _ = env.step(action)
done = terminated | truncated
# Sum all episodic rewards as we go along
ep_ret += rew
# Track episodic length
ep_len = t
# returns episodic length and return in this iteration
yield ep_len, ep_ret
def eval_policy(policy, env, render=False):
"""
The main function to evaluate our policy with. It will iterate a generator object
"rollout", which will simulate each episode and return the most recent episode's
length and return. We can then log it right after. And yes, eval_policy will run
forever until you kill the process.
Parameters:
policy - The trained policy to test, basically another name for our actor model
env - The environment to test the policy on
render - Whether we should render our episodes. False by default.
Return:
None
NOTE: To learn more about generators, look at rollout's function description
"""
# Rollout with the policy and environment, and log each episode's data
for ep_num, (ep_len, ep_ret) in enumerate(rollout(policy, env, render)):
_log_summary(ep_len=ep_len, ep_ret=ep_ret, ep_num=ep_num)

123
PPO/main.py Normal file
View File

@ -0,0 +1,123 @@
"""
This file is the executable for running PPO. It is based on this medium article:
https://medium.com/@eyyu/coding-ppo-from-scratch-with-pytorch-part-1-4-613dfc1b14c8
"""
import gymnasium as gym
import sys
import torch
from arguments import get_args
from ppo import PPO
from network import FeedForwardNN
from eval_policy import eval_policy
from env import PartitionMazeEnv
def train(env, hyperparameters, actor_model, critic_model):
"""
Trains the model.
Parameters:
env - the environment to train on
hyperparameters - a dict of hyperparameters to use, defined in main
actor_model - the actor model to load in if we want to continue training
critic_model - the critic model to load in if we want to continue training
Return:
None
"""
print(f"Training", flush=True)
# Create a model for PPO.
model = PPO(policy_class=FeedForwardNN, env=env, **hyperparameters)
# Tries to load in an existing actor/critic model to continue training on
if actor_model != '' and critic_model != '':
print(f"Loading in {actor_model} and {critic_model}...", flush=True)
model.actor.load_state_dict(torch.load(actor_model))
model.critic.load_state_dict(torch.load(critic_model))
print(f"Successfully loaded.", flush=True)
elif actor_model != '' or critic_model != '': # Don't train from scratch if user accidentally forgets actor/critic model
print(f"Error: Either specify both actor/critic models or none at all. We don't want to accidentally override anything!")
sys.exit(0)
else:
print(f"Training from scratch.", flush=True)
# Train the PPO model with a specified total timesteps
# NOTE: You can change the total timesteps here, I put a big number just because
# you can kill the process whenever you feel like PPO is converging
model.learn(total_timesteps=200_000_000)
def test(env, actor_model):
"""
Tests the model.
Parameters:
env - the environment to test the policy on
actor_model - the actor model to load in
Return:
None
"""
print(f"Testing {actor_model}", flush=True)
# If the actor model is not specified, then exit
if actor_model == '':
print(f"Didn't specify model file. Exiting.", flush=True)
sys.exit(0)
# Extract out dimensions of observation and action spaces
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
# Build our policy the same way we build our actor model in PPO
policy = FeedForwardNN(obs_dim, act_dim)
# Load in the actor model saved by the PPO algorithm
policy.load_state_dict(torch.load(actor_model))
# Evaluate our policy with a separate module, eval_policy, to demonstrate
# that once we are done training the model/policy with ppo.py, we no longer need
# ppo.py since it only contains the training algorithm. The model/policy itself exists
# independently as a binary file that can be loaded in with torch.
eval_policy(policy=policy, env=env, render=True)
def main(args):
"""
The main function to run.
Parameters:
args - the arguments parsed from command line
Return:
None
"""
# NOTE: Here's where you can set hyperparameters for PPO. I don't include them as part of
# ArgumentParser because it's too annoying to type them every time at command line. Instead, you can change them here.
# To see a list of hyperparameters, look in ppo.py at function _init_hyperparameters
hyperparameters = {
'timesteps_per_batch': 2048,
'max_timesteps_per_episode': 200,
'gamma': 0.99,
'n_updates_per_iteration': 10,
'lr': 3e-4,
'clip': 0.2,
'render': True,
'render_every_i': 10
}
# Creates the environment we'll be running. If you want to replace with your own
# custom environment, note that it must inherit Gym and have both continuous
# observation and action spaces.
# env = gym.make('Pendulum-v1', render_mode='human' if args.mode == 'test' else 'rgb_array')
env = PartitionMazeEnv()
# Train or test, depending on the mode specified
if args.mode == 'train':
train(env=env, hyperparameters=hyperparameters, actor_model=args.actor_model, critic_model=args.critic_model)
else:
test(env=env, actor_model=args.actor_model)
if __name__ == '__main__':
args = get_args() # Parse arguments from command line
main(args)

50
PPO/network.py Normal file
View File

@ -0,0 +1,50 @@
"""
This file contains a neural network module for us to
define our actor and critic networks in PPO.
"""
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
class FeedForwardNN(nn.Module):
"""
A standard in_dim-64-64-out_dim Feed Forward Neural Network.
"""
def __init__(self, in_dim, out_dim):
"""
Initialize the network and set up the layers.
Parameters:
in_dim - input dimensions as an int
out_dim - output dimensions as an int
Return:
None
"""
super(FeedForwardNN, self).__init__()
self.layer1 = nn.Linear(in_dim, 64)
self.layer2 = nn.Linear(64, 64)
self.layer3 = nn.Linear(64, out_dim)
def forward(self, obs):
"""
Runs a forward pass on the neural network.
Parameters:
obs - observation to pass as input
Return:
output - the output of our forward pass
"""
# Convert observation to tensor if it's a numpy array
if isinstance(obs, np.ndarray):
obs = torch.tensor(obs, dtype=torch.float)
activation1 = F.relu(self.layer1(obs))
activation2 = F.relu(self.layer2(activation1))
output = self.layer3(activation2)
return output

402
PPO/ppo.py Normal file
View File

@ -0,0 +1,402 @@
"""
The file contains the PPO class to train with.
NOTE: All "ALG STEP"s are following the numbers from the original PPO pseudocode.
It can be found here: https://spinningup.openai.com/en/latest/_images/math/e62a8971472597f4b014c2da064f636ffe365ba3.svg
"""
import gymnasium as gym
import time
import numpy as np
import time
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.distributions import MultivariateNormal
class PPO:
"""
This is the PPO class we will use as our model in main.py
"""
def __init__(self, policy_class, env, **hyperparameters):
"""
Initializes the PPO model, including hyperparameters.
Parameters:
policy_class - the policy class to use for our actor/critic networks.
env - the environment to train on.
hyperparameters - all extra arguments passed into PPO that should be hyperparameters.
Returns:
None
"""
# Make sure the environment is compatible with our code
assert(type(env.observation_space) == gym.spaces.Box)
assert(type(env.action_space) == gym.spaces.Box)
# Initialize hyperparameters for training with PPO
self._init_hyperparameters(hyperparameters)
# Extract environment information
self.env = env
self.obs_dim = env.observation_space.shape[0]
self.act_dim = env.action_space.shape[0]
# Initialize actor and critic networks
self.actor = policy_class(self.obs_dim, self.act_dim) # ALG STEP 1
self.critic = policy_class(self.obs_dim, 1)
# Initialize optimizers for actor and critic
self.actor_optim = Adam(self.actor.parameters(), lr=self.lr)
self.critic_optim = Adam(self.critic.parameters(), lr=self.lr)
# Initialize the covariance matrix used to query the actor for actions
self.cov_var = torch.full(size=(self.act_dim,), fill_value=0.5)
self.cov_mat = torch.diag(self.cov_var)
# This logger will help us with printing out summaries of each iteration
self.logger = {
'delta_t': time.time_ns(),
't_so_far': 0, # timesteps so far
'i_so_far': 0, # iterations so far
'batch_lens': [], # episodic lengths in batch
'batch_rews': [], # episodic returns in batch
'actor_losses': [], # losses of actor network in current iteration
}
def learn(self, total_timesteps):
"""
Train the actor and critic networks. Here is where the main PPO algorithm resides.
Parameters:
total_timesteps - the total number of timesteps to train for
Return:
None
"""
print(f"Learning... Running {self.max_timesteps_per_episode} timesteps per episode, ", end='')
print(f"{self.timesteps_per_batch} timesteps per batch for a total of {total_timesteps} timesteps")
t_so_far = 0 # Timesteps simulated so far
i_so_far = 0 # Iterations ran so far
while t_so_far < total_timesteps: # ALG STEP 2
# Autobots, roll out (just kidding, we're collecting our batch simulations here)
batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens = self.rollout() # ALG STEP 3
# Calculate how many timesteps we collected this batch
t_so_far += np.sum(batch_lens)
# Increment the number of iterations
i_so_far += 1
# Logging timesteps so far and iterations so far
self.logger['t_so_far'] = t_so_far
self.logger['i_so_far'] = i_so_far
# Calculate advantage at k-th iteration
V, _ = self.evaluate(batch_obs, batch_acts)
A_k = batch_rtgs - V.detach() # ALG STEP 5
# One of the only tricks I use that isn't in the pseudocode. Normalizing advantages
# isn't theoretically necessary, but in practice it decreases the variance of
# our advantages and makes convergence much more stable and faster. I added this because
# solving some environments was too unstable without it.
A_k = (A_k - A_k.mean()) / (A_k.std() + 1e-10)
# This is the loop where we update our network for some n epochs
for _ in range(self.n_updates_per_iteration): # ALG STEP 6 & 7
# Calculate V_phi and pi_theta(a_t | s_t)
V, curr_log_probs = self.evaluate(batch_obs, batch_acts)
# Calculate the ratio pi_theta(a_t | s_t) / pi_theta_k(a_t | s_t)
# NOTE: we just subtract the logs, which is the same as
# dividing the values and then canceling the log with e^log.
# For why we use log probabilities instead of actual probabilities,
# here's a great explanation:
# https://cs.stackexchange.com/questions/70518/why-do-we-use-the-log-in-gradient-based-reinforcement-algorithms
# TL;DR makes gradient ascent easier behind the scenes.
ratios = torch.exp(curr_log_probs - batch_log_probs)
# Calculate surrogate losses.
surr1 = ratios * A_k
surr2 = torch.clamp(ratios, 1 - self.clip, 1 + self.clip) * A_k
# Calculate actor and critic losses.
# NOTE: we take the negative min of the surrogate losses because we're trying to maximize
# the performance function, but Adam minimizes the loss. So minimizing the negative
# performance function maximizes it.
actor_loss = (-torch.min(surr1, surr2)).mean()
critic_loss = nn.MSELoss()(V, batch_rtgs)
# Calculate gradients and perform backward propagation for actor network
self.actor_optim.zero_grad()
actor_loss.backward(retain_graph=True)
self.actor_optim.step()
# Calculate gradients and perform backward propagation for critic network
self.critic_optim.zero_grad()
critic_loss.backward()
self.critic_optim.step()
# Log actor loss
self.logger['actor_losses'].append(actor_loss.detach())
# Print a summary of our training so far
self._log_summary()
# Save our model if it's time
if i_so_far % self.save_freq == 0:
torch.save(self.actor.state_dict(), './weights/ppo_actor.pth')
torch.save(self.critic.state_dict(), './weights/ppo_critic.pth')
def rollout(self):
"""
Too many transformers references, I'm sorry. This is where we collect the batch of data
from simulation. Since this is an on-policy algorithm, we'll need to collect a fresh batch
of data each time we iterate the actor/critic networks.
Parameters:
None
Return:
batch_obs - the observations collected this batch. Shape: (number of timesteps, dimension of observation)
batch_acts - the actions collected this batch. Shape: (number of timesteps, dimension of action)
batch_log_probs - the log probabilities of each action taken this batch. Shape: (number of timesteps)
batch_rtgs - the Rewards-To-Go of each timestep in this batch. Shape: (number of timesteps)
batch_lens - the lengths of each episode this batch. Shape: (number of episodes)
"""
# Batch data. For more details, check function header.
batch_obs = []
batch_acts = []
batch_log_probs = []
batch_rews = []
batch_rtgs = []
batch_lens = []
# Episodic data. Keeps track of rewards per episode, will get cleared
# upon each new episode
ep_rews = []
t = 0 # Keeps track of how many timesteps we've run so far this batch
# Keep simulating until we've run more than or equal to specified timesteps per batch
while t < self.timesteps_per_batch:
ep_rews = [] # rewards collected per episode
# Reset the environment. sNote that obs is short for observation.
obs, _ = self.env.reset()
done = False
# Run an episode for a maximum of max_timesteps_per_episode timesteps
for ep_t in range(self.max_timesteps_per_episode):
# If render is specified, render the environment
if self.render and (self.logger['i_so_far'] % self.render_every_i == 0) and len(batch_lens) == 0:
self.env.render()
t += 1 # Increment timesteps ran this batch so far
# Track observations in this batch
batch_obs.append(obs)
# Calculate action and make a step in the env.
# Note that rew is short for reward.
action, log_prob = self.get_action(obs)
obs, rew, terminated, truncated, _ = self.env.step(action)
# Don't really care about the difference between terminated or truncated in this, so just combine them
done = terminated | truncated
# Track recent reward, action, and action log probability
ep_rews.append(rew)
batch_acts.append(action)
batch_log_probs.append(log_prob)
# If the environment tells us the episode is terminated, break
if done:
break
# Track episodic lengths and rewards
batch_lens.append(ep_t + 1)
batch_rews.append(ep_rews)
# Reshape data as tensors in the shape specified in function description, before returning
batch_obs = torch.tensor(batch_obs, dtype=torch.float)
batch_acts = torch.tensor(batch_acts, dtype=torch.float)
batch_log_probs = torch.tensor(batch_log_probs, dtype=torch.float)
batch_rtgs = self.compute_rtgs(batch_rews) # ALG STEP 4
# Log the episodic returns and episodic lengths in this batch.
self.logger['batch_rews'] = batch_rews
self.logger['batch_lens'] = batch_lens
return batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens
def compute_rtgs(self, batch_rews):
"""
Compute the Reward-To-Go of each timestep in a batch given the rewards.
Parameters:
batch_rews - the rewards in a batch, Shape: (number of episodes, number of timesteps per episode)
Return:
batch_rtgs - the rewards to go, Shape: (number of timesteps in batch)
"""
# The rewards-to-go (rtg) per episode per batch to return.
# The shape will be (num timesteps per episode)
batch_rtgs = []
# Iterate through each episode
for ep_rews in reversed(batch_rews):
discounted_reward = 0 # The discounted reward so far
# Iterate through all rewards in the episode. We go backwards for smoother calculation of each
# discounted return (think about why it would be harder starting from the beginning)
for rew in reversed(ep_rews):
discounted_reward = rew + discounted_reward * self.gamma
batch_rtgs.insert(0, discounted_reward)
# Convert the rewards-to-go into a tensor
batch_rtgs = torch.tensor(batch_rtgs, dtype=torch.float)
return batch_rtgs
def get_action(self, obs):
"""
Queries an action from the actor network, should be called from rollout.
Parameters:
obs - the observation at the current timestep
Return:
action - the action to take, as a numpy array
log_prob - the log probability of the selected action in the distribution
"""
# Query the actor network for a mean action
mean = self.actor(obs)
# Create a distribution with the mean action and std from the covariance matrix above.
# For more information on how this distribution works, check out Andrew Ng's lecture on it:
# https://www.youtube.com/watch?v=JjB58InuTqM
dist = MultivariateNormal(mean, self.cov_mat)
# Sample an action from the distribution
action = dist.sample()
# Calculate the log probability for that action
log_prob = dist.log_prob(action)
# Return the sampled action and the log probability of that action in our distribution
return action.detach().numpy(), log_prob.detach()
def evaluate(self, batch_obs, batch_acts):
"""
Estimate the values of each observation, and the log probs of
each action in the most recent batch with the most recent
iteration of the actor network. Should be called from learn.
Parameters:
batch_obs - the observations from the most recently collected batch as a tensor.
Shape: (number of timesteps in batch, dimension of observation)
batch_acts - the actions from the most recently collected batch as a tensor.
Shape: (number of timesteps in batch, dimension of action)
Return:
V - the predicted values of batch_obs
log_probs - the log probabilities of the actions taken in batch_acts given batch_obs
"""
# Query critic network for a value V for each batch_obs. Shape of V should be same as batch_rtgs
V = self.critic(batch_obs).squeeze()
# Calculate the log probabilities of batch actions using most recent actor network.
# This segment of code is similar to that in get_action()
mean = self.actor(batch_obs)
dist = MultivariateNormal(mean, self.cov_mat)
log_probs = dist.log_prob(batch_acts)
# Return the value vector V of each observation in the batch
# and log probabilities log_probs of each action in the batch
return V, log_probs
def _init_hyperparameters(self, hyperparameters):
"""
Initialize default and custom values for hyperparameters
Parameters:
hyperparameters - the extra arguments included when creating the PPO model, should only include
hyperparameters defined below with custom values.
Return:
None
"""
# Initialize default values for hyperparameters
# Algorithm hyperparameters
self.timesteps_per_batch = 4800 # Number of timesteps to run per batch
self.max_timesteps_per_episode = 1600 # Max number of timesteps per episode
self.n_updates_per_iteration = 5 # Number of times to update actor/critic per iteration
self.lr = 0.005 # Learning rate of actor optimizer
self.gamma = 0.95 # Discount factor to be applied when calculating Rewards-To-Go
self.clip = 0.2 # Recommended 0.2, helps define the threshold to clip the ratio during SGA
# Miscellaneous parameters
self.render = True # If we should render during rollout
self.render_every_i = 10 # Only render every n iterations
self.save_freq = 10 # How often we save in number of iterations
self.seed = None # Sets the seed of our program, used for reproducibility of results
# Change any default values to custom values for specified hyperparameters
for param, val in hyperparameters.items():
exec('self.' + param + ' = ' + str(val))
# Sets the seed if specified
if self.seed != None:
# Check if our seed is valid first
assert(type(self.seed) == int)
# Set the seed
torch.manual_seed(self.seed)
print(f"Successfully set seed to {self.seed}")
def _log_summary(self):
"""
Print to stdout what we've logged so far in the most recent batch.
Parameters:
None
Return:
None
"""
# Calculate logging values. I use a few python shortcuts to calculate each value
# without explaining since it's not too important to PPO; feel free to look it over,
# and if you have any questions you can email me (look at bottom of README)
delta_t = self.logger['delta_t']
self.logger['delta_t'] = time.time_ns()
delta_t = (self.logger['delta_t'] - delta_t) / 1e9
delta_t = str(round(delta_t, 2))
t_so_far = self.logger['t_so_far']
i_so_far = self.logger['i_so_far']
avg_ep_lens = np.mean(self.logger['batch_lens'])
avg_ep_rews = np.mean([np.sum(ep_rews) for ep_rews in self.logger['batch_rews']])
avg_actor_loss = np.mean([losses.float().mean() for losses in self.logger['actor_losses']])
# Round decimal places for more aesthetic logging messages
avg_ep_lens = str(round(avg_ep_lens, 2))
avg_ep_rews = str(round(avg_ep_rews, 2))
avg_actor_loss = str(round(avg_actor_loss, 5))
# Print logging statements
print(flush=True)
print(f"-------------------- Iteration #{i_so_far} --------------------", flush=True)
print(f"Average Episodic Length: {avg_ep_lens}", flush=True)
print(f"Average Episodic Return: {avg_ep_rews}", flush=True)
print(f"Average Loss: {avg_actor_loss}", flush=True)
print(f"Timesteps So Far: {t_so_far}", flush=True)
print(f"Iteration took: {delta_t} secs", flush=True)
print(f"------------------------------------------------------", flush=True)
print(flush=True)
# Reset batch-specific logging data
self.logger['batch_lens'] = []
self.logger['batch_rews'] = []
self.logger['actor_losses'] = []