강화학습: 텐서플로 케라스로 Continuous A2C 구현하고 Pendulum 학습시키기
Nov. 3, 2022, 12:37 p.m.
1. Continuous A2C
얼마 전에 A2C 에이전트에 대해서 포스팅을 한 적이 있었는데요, 이번에는 Continuous A2C에 대해서 가볍게 알아보겠습니다.
.
A2C는 행동이 불연속적입니다. 그래서 장점도 있지만 단점이라면 행동의 크기를 조절할 수는 없다는 것입니다. 카트폴 학습 예제에서도 막대가 넘어지지 않기 위해서는 막대가 많이 기울었을때는 빨리 움직여야 하지만, 막대가 가운데에 있을 때는 아주 살짝씩만 움직여주어야 합니다.
.
그렇지만 기존 A2C에이전트는 1 또는 -1 로만 움직일 수 있었기에 이런 미세한 조정을 할 수 없었죠. 만약 -1부터 1까지 실수 범위 내에서 행동을 할 수 있었더라면 어떨까요? 훨씬 미세한 조정으로 더 좋은 성능을 낼 것입니다.
이렇듯 A2C 에이전트가 연속적 행동을 할 수 있도록 수정을 할 수 있는데요, 이를 Continuous A2C라고 합니다.
.
기존 A2C와의 차이점은 Actor 신경망이 각 행동을 할 확률을 반환하는 것이 아니라 각 행동의 평균과 표준편차를 반환합니다. 그 후에 정규분포에서 평균과 표준편차를 이용해 샘플링해서 실제로 할 행동을 정하게 됩니다.
그럼 이제 실제 코드로 한번 살펴 보겠습니다.
2. 케라스로 구현하기
먼저 전체 코드입니다.
import numpy as np
import tensorflow as tf
from tensorflow_probability import distributions as tfd
import pylab
import gym
class ContinuousA2C:
def __init__(self, state_size, action_size, max_action):
self.action_size = action_size
self.state_size = state_size
self.max_action = max_action
self.std_bound = [1e-2, 1.0]
self.discount_factor = 0.95
self.actor_learning_rate = 0.0001
self.critic_learning_rate = 0.001
self.actor = self.build_actor()
self.critic = self.build_critic()
self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=self.actor_learning_rate)
self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=self.critic_learning_rate)
def build_critic(self):
critic_input = tf.keras.Input((self.state_size,))
critic = tf.keras.layers.Dense(64, activation='relu')(critic_input)
critic = tf.keras.layers.Dense(32, activation='relu')(critic)
critic = tf.keras.layers.Dense(16, activation='relu')(critic)
critic_out = tf.keras.layers.Dense(1, activation='linear')(critic)
return tf.keras.Model(critic_input, critic_out)
def build_actor(self):
actor_input = tf.keras.Input((self.state_size,))
actor = tf.keras.layers.Dense(64, activation='relu')(actor_input)
actor = tf.keras.layers.Dense(32, activation='relu')(actor)
actor = tf.keras.layers.Dense(16, activation='relu')(actor)
mu_out = tf.keras.layers.Dense(self.action_size, activation='tanh')(actor)
mu_out = tf.keras.layers.Lambda(lambda x: x * self.max_action)(mu_out)
sigma_out = tf.keras.layers.Dense(self.action_size, activation='softplus')(actor)
return tf.keras.Model(actor_input, [mu_out, sigma_out])
def get_action(self, state):
mu, sigma = self.actor(state)
sigma = tf.clip_by_value(sigma, self.std_bound[0], self.std_bound[1])
dist = tfd.Normal(loc=mu[0], scale=sigma[0])
action = dist.sample([1])[0]
action = np.clip(action, -self.max_action, self.max_action)
return action
def train_model(self, state, action, reward, next_state, done):
actor_params = self.actor.trainable_variables
critic_params = self.critic.trainable_variables
next_value = self.critic(next_state)
target = reward + (1 - done) * self.discount_factor * next_value[0]
with tf.GradientTape() as tape1:
mu, sigma = self.actor(state, training=True)
adv = tf.stop_gradient(target - self.critic(state, training=True))
dist = tfd.Normal(loc=mu, scale=sigma)
action_prob = dist.prob([action])[0]
cross_entropy = - tf.math.log(action_prob + 1e-5)
actor_loss = tf.reduce_mean(cross_entropy * adv)
actor_grads = tape1.gradient(actor_loss, actor_params)
self.actor_optimizer.apply_gradients(zip(actor_grads, actor_params))
with tf.GradientTape() as tape2:
value = self.critic(state, training=True)
critic_loss = 0.5 * tf.square(target - value[0])
critic_loss = tf.reduce_mean(critic_loss)
critic_grads = tape2.gradient(critic_loss, critic_params)
self.critic_optimizer.apply_gradients(zip(critic_grads, critic_params))
return actor_loss, critic_loss, sigma
def load_weights(self, path):
self.actor.load_weights(path + 'pendulum_actor.h5')
self.critic.load_weights(path + 'pendulum_critic.h5')
def save_weights(self, path):
self.actor.save_weights(path + 'pendulum_actor.h5')
self.critic.save_weights(path + 'pendulum_critic.h5')
def train(self, env, num_episode=1000):
scores, episodes = [], []
score_avg = 0
for e in range(num_episode):
done = False
score = 0
actor_loss_list, critic_loss_list, sigma_list = [], [], []
state, info = env.reset()
state = np.reshape(state, [1, self.state_size])
while not done:
action = self.get_action(state)
next_state, reward, term, trunc, info = env.step(action)
next_state = np.reshape(next_state, [1, self.state_size])
reward = (reward + 8) / 8
done = term or trunc
score += reward
actor_loss, critic_loss, sigma = self.train_model(state, action, reward, next_state, done)
actor_loss_list.append(actor_loss)
critic_loss_list.append(critic_loss)
sigma_list.append(sigma)
state = next_state
if done:
score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else score
print("episode: {:3d} | score avg: {:3.2f} | actor_loss: {:.3f} | critic_loss: {:.3f} | sigma: {:.3f}".format(
e, score_avg, np.mean(actor_loss_list), np.mean(critic_loss_list), np.mean(sigma_list)))
scores.append(score_avg)
episodes.append(e)
pylab.plot(episodes, scores, 'b')
pylab.xlabel("episode")
pylab.ylabel("average score")
pylab.savefig("graph.png")
if e % 100 == 0:
self.save_weights('')
env = gym.make('BipedalWalker-v3', render_mode='human')
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
max_action = env.action_space.high[0]
agent = ContinuousA2C(state_size, action_size, max_action)
#agent.load_weights('')
agent.train(env)
일단 에이전트 초기화 하는 부분부터 다시 살펴봅시다.
class ContinuousA2C:
def __init__(self, state_size, action_size, max_action):
self.action_size = action_size
self.state_size = state_size
self.max_action = max_action
self.std_bound = [1e-2, 1.0]
self.discount_factor = 0.95
self.actor_learning_rate = 0.0001
self.critic_learning_rate = 0.001
self.actor = self.build_actor()
self.critic = self.build_critic()
self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=self.actor_learning_rate)
self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=self.critic_learning_rate)
def build_critic(self):
critic_input = tf.keras.Input((self.state_size,))
critic = tf.keras.layers.Dense(64, activation='relu')(critic_input)
critic = tf.keras.layers.Dense(32, activation='relu')(critic)
critic = tf.keras.layers.Dense(16, activation='relu')(critic)
critic_out = tf.keras.layers.Dense(1, activation='linear')(critic)
return tf.keras.Model(critic_input, critic_out)
def build_actor(self):
actor_input = tf.keras.Input((self.state_size,))
actor = tf.keras.layers.Dense(64, activation='relu')(actor_input)
actor = tf.keras.layers.Dense(32, activation='relu')(actor)
actor = tf.keras.layers.Dense(16, activation='relu')(actor)
mu_out = tf.keras.layers.Dense(self.action_size, activation='tanh')(actor)
mu_out = tf.keras.layers.Lambda(lambda x: x * self.max_action)(mu_out)
sigma_out = tf.keras.layers.Dense(self.action_size, activation='softplus')(actor)
return tf.keras.Model(actor_input, [mu_out, sigma_out])
먼저 초기화 하는 부분을 살펴보면 상태의 크기, 행동의 크기도 받아오지만 여기서는 행동의 최대 크기도 받아옵니다.
그 다음 actor와 critic의 learning rate를 지정해주는데 critic을 actor보다 10배 크게 해주었습니다. 아무래도 값의 변화가 더 크기 때문이죠.
그 다음 actor와 critic의 모델을 빌드하는 함수를 보면
먼저 critic은 4개의 Dense Layer로 되어 있습니다. 마지막 출력층은 활성화 함수가 없습니다.
제일 중요한 actor같은 경우 mu와 sigma를 출력하게 되는데요, mu는 행동의 평균, sigma가 표준편차가 되겠습니다.
mu는 tanh 함수를 써서 -1에서 1까지의 범위로 출력되게 하고 거기에 Lambda를 이용해서 최대 행동 범위 만큼 늘려줍니다.
.
두번째는 행동 반환하는 함수와 신경망 업데이트 부분입니다.
def get_action(self, state):
mu, sigma = self.actor(state)
sigma = tf.clip_by_value(sigma, self.std_bound[0], self.std_bound[1])
dist = tfd.Normal(loc=mu[0], scale=sigma[0])
action = dist.sample([1])[0]
action = np.clip(action, -self.max_action, self.max_action)
return action
def train_model(self, state, action, reward, next_state, done):
actor_params = self.actor.trainable_variables
critic_params = self.critic.trainable_variables
next_value = self.critic(next_state)
target = reward + (1 - done) * self.discount_factor * next_value[0]
with tf.GradientTape() as tape1:
mu, sigma = self.actor(state, training=True)
adv = tf.stop_gradient(target - self.critic(state, training=True))
dist = tfd.Normal(loc=mu, scale=sigma)
action_prob = dist.prob([action])[0]
cross_entropy = - tf.math.log(action_prob + 1e-5)
actor_loss = tf.reduce_mean(cross_entropy * adv)
actor_grads = tape1.gradient(actor_loss, actor_params)
self.actor_optimizer.apply_gradients(zip(actor_grads, actor_params))
with tf.GradientTape() as tape2:
value = self.critic(state, training=True)
critic_loss = 0.5 * tf.square(target - value[0])
critic_loss = tf.reduce_mean(critic_loss)
critic_grads = tape2.gradient(critic_loss, critic_params)
self.critic_optimizer.apply_gradients(zip(critic_grads, critic_params))
return actor_loss, critic_loss, sigma
get_action()을 보면 액터로부터 mu, sigma를 가져온 뒤 sigma를 먼저 범위 제한을 해주고, tensorflow_probability 패키지의 distribution.Normal을 이용해서 정규분포를 만듭니다. 그리고 여기서 샘플을 하나 뽑아서 다시 범위 제한을 해주고 행동으로 반환하게 됩니다.
두번째는 학습하는 부분인데요 train_model() 함수를 보시면 state, action, reward, next_state, done 샘플을 가지고 액터와 크리틱 모두 가중치를 업데이트 하는 모습을 볼 수 있습니다. 가중치 업데이트에 관한 내용은 A2C와 동일 하므로 해당 포스트를 참고해주세요. 텐서플로 케라스로 구현하는 A2C
각각의 신경망에 대해서 가중치 업데이트를 진행해 줍니다.
.
마지막으로 실제로 환경에서 학습을 진행하는 부분입니다.
def train(self, env, num_episode=1000):
scores, episodes = [], []
score_avg = 0
for e in range(num_episode):
done = False
score = 0
actor_loss_list, critic_loss_list, sigma_list = [], [], []
state, info = env.reset()
state = np.reshape(state, [1, self.state_size])
while not done:
action = self.get_action(state)
next_state, reward, term, trunc, info = env.step(action)
next_state = np.reshape(next_state, [1, self.state_size])
reward = (reward + 8) / 8
done = term or trunc
score += reward
actor_loss, critic_loss, sigma = self.train_model(state, action, reward, next_state, done)
actor_loss_list.append(actor_loss)
critic_loss_list.append(critic_loss)
sigma_list.append(sigma)
state = next_state
if done:
score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else score
print("episode: {:3d} | score avg: {:3.2f} | actor_loss: {:.3f} | critic_loss: {:.3f} | sigma: {:.3f}".format(
e, score_avg, np.mean(actor_loss_list), np.mean(critic_loss_list), np.mean(sigma_list)))
scores.append(score_avg)
episodes.append(e)
pylab.plot(episodes, scores, 'b')
pylab.xlabel("episode")
pylab.ylabel("average score")
pylab.savefig("graph.png")
if e % 100 == 0:
self.save_weights('')
학습할 환경을 매개변수로 받아와 주고 에피소드의 수 만큼 학습을 반복해 줍니다. 매 에피소드의 시작에는 환경을 리셋해주고, 각 정보를 담을 리스트를 초기화 해 줍니다.
.
그다음 환경이 종료될 때까지 행동을 하고 가중치를 업데이트하는 것을 반복합니다. 여기서 중요한 점은 reward를 계산할 때 정규화를 진행해주었다는 점입니다. 덕분에 학습이 더욱 안정적으로 진행되게 됩니다.
3. 학습 시키기
이제 에이전트를 학습시켜보는 일 만 남았습니다.
env = gym.make('Pendulum-v1', render_mode='human')
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
max_action = env.action_space.high[0]
agent = ContinuousA2C(state_size, action_size, max_action)
agent.train(env)
이런식으로 실행을 하게 되면 에이전트가 Pendulum 환경에서 학습을 진행하게 될 것입니다.
대략 400에피소드정도 학습을 진행한 에이전트가 Pendulum 환경을 해결하는 영상을 보고 마무리하겠습니다.
junhyung2338
안녕하세요. 좋은 정보가 담긴 글 감사합니다.
저는 3개의 서보모터 각도에 대한 imu센서의 roll pitch각도를 데이터화한 csv파일을 강화 학습을 통해 수평유지장치를 구현하려합니다. 데이터 전처리나 gym을 이용하지 않은 강화학습에 대해서 질문을 좀 드릴 수 있을까요?
Jan. 31, 2024, 5:44 p.m.