강화학습 : 파이썬 케라스로 구현한 액터-크리틱, A2C
Oct. 24, 2022, 11:18 p.m.
이번에는 A2C라는 강화학습 알고리즘을 가져왔습니다. 정책 이터레이션 알고리즘들의 문제점들을 보완한 알고리즘이라고 볼 수 있는데요, 자세히 알아보도록 하겠습니다.
1. A2C란?
먼저, 이전에 REINFORCE에서 사용한 정책 신경망의 업데이트 식을 다시 살펴보도록 하겠습니다.
정책에 큐함수를 곱하는 형태입니다. 하지만 큐함수 자체를 알 수가 없기 때문에 REINFORCE에서는 에피소드가 끝난 후 반환값을 큐함수 대신에 쓰는 방식을 택했고, 이것은 에피소드가 길어질 경우 반환값이 굉장히 커진다거나 에피소드의 끝이 없는 경우도 있어 문제가 되고. 여전히 큐함수가 아니라는 단점이 있었죠.
그래서 A2C에서는 큐함수 자체도 신경망으로 근사를 하려고 합니다. 정책 신경망과 가치 신경망 두개를 사용하게 되는 것이죠! 가치 함수는 정책을 평가하는 역할을 하기 때문에 Critic, 그리고 정책 신경망을 Actor로 부릅니다.
하지만 지금 소개할 알고리즘은 A2C이죠? A가 2개 C가 1개입니다. 하나의 A가 빠졌네요. 이것은 Advantage를 의미합니다.
큐함수를 이제 신경망으로 근사를 할 텐데, 이것을 그대로 적용한다면 큐함수의 값 자체에 의해 오차함수가 크게 바뀌게 되겠죠. 이것은 분산이 커지는 효과를 낳게 됩니다. 그래서 베이스라인을 정해주어야 합니다.
A2C에서는 이 베이스라인을 가치함수로 정했습니다. 가치함수는 행동에 따라서 값이 달라지지 않기 때문에 분산이 적어서 베이스라인으로 적절하죠.
그래서 Advantage, 어드밴티지는 아래와 같이 정의 됩니다.
사실 여기서 또 문제가 있습니다. 큐함수만 근사하려고 했는데 가치함수까지 근사를 해야되네요. 신경망을 3개나 써야할까요? 하지만 그렇지 않습니다. 다행히 큐함수는 가치함수로 나타낼 수 있죠. 가치함수만으로 나타낸 어드밴티지는 아래와 같습니다.
자. 이제 우리는 큐함수 대신에 가치함수를 근사하는 정책 신경망을 사용하면 되겠네요. 어드밴티지를 사용하면 정책 신경망의 업데이트 식은 이렇게 됩니다.
마지막으로 가치 신경망의 오차함수만 확인하고 갑시다.
A2C의 이론적인 배경은 다 확인을 해 본 것 같습니다. 이제 파이썬으로 구현해보겠습니다.
2. 파이썬으로 구현
파이썬 코드는 위키북스의 파이썬과 케라스로 배우는 강화학습 책을 참고하였습니다. 먼저 코드 전문입니다.
import sys
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomUniform
class NN(tf.keras.Model):
def __init__(self, action_size):
super(NN, self).__init__()
self.actor_fc = Dense(24, activation='tanh')
self.actor_out = Dense(action_size, activation='softmax', kernel_initializer=RandomUniform(-1e-3, 1e-3))
self.critic_fc1 = Dense(24, activation='tanh')
self.critic_fc2 = Dense(24, activation='tanh')
self.critic_out = Dense(1, kernel_initializer=RandomUniform(-1e-3, 1e-3))
def __call__(self, x):
actor_x = self.actor_fc(x)
policy = self.actor_out(actor_x)
critic_x = self.critic_fc1(x)
critic_x = self.critic_fc2(critic_x)
value = self.critic_out(critic_x)
return policy, value
class A2C:
def __init__(self, action_size):
self.render = False
self.action_size = action_size
self.discount_factor = 0.99
self.learning_rate = 0.001
self.model = NN(self.action_size)
self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=5.0)
def get_action(self, state):
policy, _ = self.model(state)
policy = np.array(policy[0])
return np.random.choice(self.action_size, 1, p=policy)[0]
def train_model(self, state, action, reward, next_state, done):
model_params = self.model.trainable_variables
with tf.GradientTape() as tape:
policy, value = self.model(state)
_, next_value = self.model(next_state)
target = reward + (1 - done) * self.discount_factor * next_value[0]
one_hot = tf.one_hot([action], self.action_size)
action_prob = tf.reduce_sum(one_hot * policy, axis=1)
adv = tf.stop_gradient(target - value[0])
actor_loss = tf.math.log(action_prob + 1e-5) * adv
actor_loss = -tf.reduce_mean(actor_loss)
critic_loss = 0.5 * tf.square(tf.stop_gradient(target) - value[0])
critic_loss = tf.reduce_mean(critic_loss)
loss = 0.2 * actor_loss + critic_loss
gradient = tape.gradient(loss, model_params)
self.optimizer.apply_gradients(zip(gradient, model_params))
return np.array(loss)
많이 길지 않습니다. 천천히 알아봅시다.
class NN(tf.keras.Model):
def __init__(self, action_size):
super(NN, self).__init__()
self.actor_fc = Dense(24, activation='tanh')
self.actor_out = Dense(action_size, activation='softmax', kernel_initializer=RandomUniform(-1e-3, 1e-3))
self.critic_fc1 = Dense(24, activation='tanh')
self.critic_fc2 = Dense(24, activation='tanh')
self.critic_out = Dense(1, kernel_initializer=RandomUniform(-1e-3, 1e-3))
def call(self, x):
actor_x = self.actor_fc(x)
policy = self.actor_out(actor_x)
critic_x = self.critic_fc1(x)
critic_x = self.critic_fc2(critic_x)
value = self.critic_out(critic_x)
return policy, value
먼저 신경망 구성입니다. 케라스 Dense 레이어를 이용해서 두개의 신경망을 만든 모습을 볼 수 있습니다. actor 신경망은 행동의 수만큼의 값을 반환하는 출력 레이어를 가지고 있고, critic 신경망은 가치함수만 출력하면 되므로 1개의 값을 반환하는 출력 레이어를 가지고 있습니다.
call 함수를 오버라이드 해서 각각의 신경망을 거쳐 나온 값을 튜플 형태로 반환하도록 하였습니다.
이제 에이전트 부분을 자세히 살펴보겠습니다.
class A2C:
def __init__(self, action_size):
self.render = False
self.action_size = action_size
self.discount_factor = 0.99
self.learning_rate = 0.001
self.model = NN(self.action_size)
self.optimizer = Adam(learning_rate=self.learning_rate, clipnorm=5.0)
먼저 초기화 부분입니다. 필요한 하이퍼파라미터들을 정의 해주었습니다. 위에서 정의한 신경망을 model에 선언했고 옵티마이저로는 Adam을 사용합니다. clipnorm은 급격한 그래디언트가 생기는 것을 방지해줍니다.
.
def get_action(self, state):
policy, _ = self.model(state)
policy = np.array(policy[0])
return np.random.choice(self.action_size, 1, p=policy)[0]
행동을 가져오는 부분입니다. 정책신경망의 출력을 가져와서 각 행동의 확률만큼 랜덤으로 뽑아 반환합니다.
.
마지막으로 제일 중요한 학습 부분입니다.
def train_model(self, state, action, reward, next_state, done):
model_params = self.model.trainable_variables
with tf.GradientTape() as tape:
policy, value = self.model(state)
_, next_value = self.model(next_state)
target = reward + (1 - done) * self.discount_factor * next_value[0]
one_hot = tf.one_hot([action], self.action_size)
action_prob = tf.reduce_sum(one_hot * policy, axis=1)
adv = tf.stop_gradient(target - value[0])
actor_loss = tf.math.log(action_prob + 1e-5) * adv
actor_loss = -tf.reduce_mean(actor_loss)
critic_loss = 0.5 * tf.square(tf.stop_gradient(target) - value[0])
critic_loss = tf.reduce_mean(critic_loss)
loss = 0.2 * actor_loss + critic_loss
gradient = tape.gradient(loss, model_params)
self.optimizer.apply_gradients(zip(gradient, model_params))
return np.array(loss)
먼저 trainable_variables로 훈련할 가중치를 받아 옵니다. 그다음 GradientTape을 이용해서 미분을 기록합니다.
현재 state와 다음 next_state를 신경망에 넣어 현재 상태의 정책과 현재 상태의 가치함수, 그리고 다음 상태의 가치함수를 얻어옵니다. 그리고 가치 신경망의 업데이트 목표인 target을 계산합니다.
또한 정책 신경망의 업데이트 목표인 actor_loss를 계산합니다. 앞서 언급한 공식을 그대로 적용하여 어드밴티지를 구하고 오차함수를 구합니다.
여기서 중요한 것은 어드밴티지를 구하는 과정에서 tf.stop_gradient를 사용했다는 점입니다. 정책신경망을 업데이트 하기 위한 오차함수를 계산하는 과정에서 가치 신경망의 출력으로 계산한 target이 들어가게 되는데 만약 이를 그대로 둔 다면 오차함수의 gradient가 가치 신경망의 가중치도 업데이트 하게 되기 때문입니다.
마지막으로 앞서 구한 target을 이용해서 가치 신경망의 오차함수인 critic_loss를 계산합니다. 여기서도 stop_gradient를 사용하는데 이것은 현재 상태의 가치함수가 아니라 다음 상태의 가치함수를 사용하기 때문에 사용했습니다.
마지막으로 두 가치함수를 더해 최종 가치함수 loss 를 계산합니다.
그리고 tape.gradient를 통해 미분을 얻고 optimizer의 apply_gradients로 가중치를 업데이트 하는 방식입니다.
.
위와 같이 케라스로 구현을 하게 되면 간단하게 구현이 되는 모습을 볼 수 있습니다. 이제 에이전트를 이용해 직접 학습을 시켜 봐야겠죠?
3. A2C로 CartPole 학습시키기
먼저 코드 전체를 보겠습니다.
import gym
env = gym.make('CartPole-v1', render_mode='human')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = A2C(action_size)
score_avg = 0
EPISODES = 1000
for e in range(EPISODES):
done = False
score = 0
state, info = env.reset()
state = np.reshape(state, [1, state_size])
while not done:
action = agent.get_action(state)
next_state, reward, term, trunc, info = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
done = term or trunc
score += reward
reward = 0.1 if not done or score == 500 else -1
loss = agent.train_model(state, action, reward, next_state, done)
state = next_state
if done:
score_avg = 0.9 * score_avg + score * 0.1
print(f'e : {e} | score {score_avg} | loss {loss}')
if score_avg > 400:
e = EPISODES
OPEN AI gym 을 사용하기 위해서는 먼저 설치가 필요합니다. OPEN AI gym 을 참고하세요.
.
자 먼저 학습 초기 설정하는 부분입니다.
import gym
env = gym.make('CartPole-v1', render_mode='human')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = A2C(action_size)
score_avg = 0
EPISODES = 1000
gym 패키지 임포트 했고, gym.make로 학습 환경을 가져옵니다. CartPole이라는 환경을 가져왔습니다. render_mode는 꼭 human으로 해주어야 합니다.
그다음 해당 학습 환경의 상태크기, 행동의 종류를 받아옵니다.
그리고 위에서 만들어준 A2C 에이전트를 선언하고 점수를 담을 변수와 총 학습할 에피소드 수를 선언합니다.
.
매 에피소드마다 초기화 해주는 부분입니다.
for e in range(EPISODES):
done = False
score = 0
state, info = env.reset()
state = np.reshape(state, [1, state_size])
done 이라는 변수는 해당 에피소드가 끝났는지 저장하는 변수이고, 해당 에피소드의 점수를 담을 score를 선언해 주었습니다.
그다음 env.reset()을 통해 환경을 초기화 해 줍니다. 현재 초기 상태 state와 추가정보 info 를 반환하는데 info는 그냥 빈 딕셔너리를 반환하더라구요. 크게 쓸모는 없지만 return 하기 때문에 받아줍니다.
그다음 state을 reshape 해줍니다. 배치 형태로 리스트 안에 둘러싸인 형태로 반환되기 때문입니다.
.
while not done:
action = agent.get_action(state)
next_state, reward, term, trunc, info = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
done = term or trunc
score += reward
reward = 0.1 if not done or score == 500 else -1
loss = agent.train_model(state, action, reward, next_state, done)
state = next_state
학습이 진행되는 부분입니다. 먼저 agent에게 현재 상태를 집어넣고 할 행동 action을 받아옵니다.
그 다음 env.step()을 통해 환경에 해당 행동을 전달하고 그 결과인 다음상태 next_state와 보상, 그리고 종료 여부인 term과 trunc를 받아옵니다. 이 두가지 정보는 살짝 다른데 여기서는 거의 동일하다고 보면 됩니다. 그리고 역시 마지막으로 추가정보 info(여기서는 빈 딕셔너리임)를 받아옵니다.
term 혹은 trunc가 참일 경우 에피소드가 종료되었다고 판단하기 때문에 해당 정보를 done에 저장해줍니다.
그 다음 score에 보상을 더해주고 학습 용으로 보상을 재 정의 해줍니다. 매 프레임마다 0.1씩 받고 에피소드가 끝났을 경우 점수가 500이 넘지 않았다면 충분한 시간을 버티지 못하고 에피소드가 끝났다고 판단하고 -1을 보상으로 줍니다.
이 정보를 agent에 전달해 train_model()로 학습을 진행합니다.
마지막으로 현재 상태에 다음 상태를 저장합니다.
.
if done:
score_avg = 0.9 * score_avg + score * 0.1
print(f'e : {e} | score {score_avg} | loss {loss}')
if score_avg > 400:
e = EPISODES
마지막입니다. 에피소드가 종료 되었을 때 score_avg에 이동평균을 적용하고 이번 에피소드의 결과를 출력합니다. 그리고 score_avg가 400이 넘었다면 충분히 학습되었다고 생각하고 e에 목표 EPISODE 수를 넣어주고 학습이 종료되게 합니다.
4. 학습 결과
이 상태로 학습을 진행하게 되면 아래와 같이 결과가 출력되는 모습을 볼 수 있을 것입니다.
e : 221 | score 311.78435682068545 | loss 16.955520629882812
e : 222 | score 330.6059211386169 | loss 10.153448104858398
e : 223 | score 318.4453290247552 | loss 15.554280281066895
e : 224 | score 308.3007961222797 | loss 13.723217010498047
e : 225 | score 301.57071651005174 | loss 12.083921432495117
e : 226 | score 289.3136448590466 | loss 12.890955924987793
e : 227 | score 279.982280373142 | loss 16.286365509033203
e : 228 | score 271.78405233582777 | loss 7.946560859680176
e : 229 | score 268.305647102245 | loss 7.667291641235352
e : 230 | score 255.27508239202052 | loss 7.135240077972412
e : 231 | score 247.94757415281848 | loss 6.175633430480957
e : 232 | score 236.3528167375366 | loss 7.966721534729004
e : 233 | score 224.61753506378295 | loss 9.910347938537598
e : 234 | score 213.35578155740464 | loss 8.028792381286621
e : 235 | score 204.22020340166418 | loss 6.740499973297119
e : 236 | score 200.09818306149776 | loss 5.180319309234619
e : 237 | score 194.58836475534798 | loss 4.8157196044921875
e : 238 | score 186.0295282798132 | loss 4.695494174957275
e : 239 | score 182.4265754518319 | loss 3.76145601272583
e : 240 | score 184.7839179066487 | loss 2.081494092941284
e : 241 | score 183.70552611598384 | loss 0.40638697147369385
e : 242 | score 185.53497350438545 | loss 0.8901634812355042
e : 243 | score 194.1814761539469 | loss 15.922541618347168
e : 244 | score 196.86332853855222 | loss 0.8887766599655151
e : 245 | score 214.97699568469702 | loss 1.5858614444732666
e : 246 | score 221.27929611622733 | loss 1.0099419355392456
e : 247 | score 221.6513665046046 | loss 6.9734206199646
e : 248 | score 228.58622985414414 | loss 2.3435380458831787
e : 249 | score 234.52760686872975 | loss 9.800880432128906
e : 250 | score 233.07484618185677 | loss 3.8369460105895996
그리고 새 pygame창이 열리면서 실제로 카트폴이 진행되는 모습도 볼 수 있을 것입니다.
마지막으로 충분히 학습된 A2C 에이전트 영상을 보고 마무리 하겠습니다.