LSTM 연습
이번 연습은 RNN기반의 Language Model을 구현해서 텍스트를 생성해보겠습니다. 파이토치를 활용하여 LSTM모델을 직접 구현하겠습니다. 데이터셋으로는 WikiTree dataset을 사용하며, 자주 사용되지 않는 단어나 영어가 아닌 단어는 이미 전처리 되어있는 데이터셋입니다.
1. 연습에 필요한 패키지를 불러오고 몇 가지 설정을 하겠습니다.
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch import optim
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import tqdm
import os
import random
import time
import datetime
# for reproducibility
random.seed(1234)
np.random.seed(1234)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
2. 필요한 데이터 셋을 불러오겠습니다.
import urllib
with urllib.request.urlopen('https://raw.githubusercontent.com/yunjey/pytorch-tutorial/master/tutorials/02-intermediate/language_model/data/train.txt') as f:
data = f.readlines()
# 최대 문장 길이를 정하기 위해 시각화하여 단어의 개수가 줄어드는 지점으로 정하겠습니다.
seq_length_list = []
for line in data:
seq_length_list.append(len(line.split()))
counts, bins = np.histogram(seq_length_list, bins=20)
plt.hist(bins[:-1], bins, weights=counts)
plt.show()
# 시각화한 결과에 따라 최대 문장 길이를 50으로 정하겠습니다.
max_seq_len = 50
3. text데이터를 전처리하겠습니다. text데이터를 모델에 입력하기 위해서는 text에 존재하는 모든 단어들을 index로 변환해주어야 합니다. 이를 위해 단어를 index로 변환해주는 word2idx와 index를 단어로 변환해주는 idx2word를 만들겠습니다. 앞서 만든 word2idx를 이용하여 text로 된 wikitree데이터를 index로 변환하겠습니다.
# 모든 문장에 대해 word2idx와 idx2word를 생성하는 함수를 만들겠습니다.
def build_dictionary(data, max_seq_len):
word2idx = {}
idx2word = {}
# pad와 unk는 미리 0, 1로 index를 지정해줍니다.
word2idx['<pad>'] = 0
word2idx['<unk>'] = 1
idx2word[0] = '<pad>'
idx2word[1] = '<unk>'
# 따라서 index는 2부터 차례대로 지정하겠습니다.
idx = 2
for line in data:
words = line.decode('utf-8').split()
words = words[:max_seq_len]
# 모든 단어에 대해 index를 한개씩 지정해주겠습니다.
for word in words:
if word not in word2idx:
word2idx[word] = idx
idx2word[idx] = word
idx += 1
return word2idx, idx2word
# 주어진 wikitree데이터셋에 있는 단어를 대상으로 word2idx와 idx2word를 만들었습니다.
word2idx, idx2word = build_dictionary(data, max_seq_len)
# word2idx와 idx2word를 이용하여 데이터를 index로 만드는 함수를 구현합니다.
def preprocess(data, word2idx, max_seq_len):
tokens = []
for line in data:
words = line.decode('utf-8').split()
# 주어진 문장에 대해 최대 문장 길이만큼 단어를 뽑습니다.
words = words[:max_seq_len]
# 주어진 문장의 길이가 최대 문장 길이보다 짧다면 나머지 부분을 0으로 padding합니다.
words += ['<pad>']*(max_seq_len - len(words))
for word in words:
token = word2idx[word]
tokens.append(token)
return tokens
tokens = preprocess(data, word2idx, max_seq_len)
# 전처리 된 Token들을 문장 단위의 배열로 변환합니다.
tokens = np.array(tokens).reshape(-1,max_seq_len)
4. 전처리한 데이터를 활용하여 파이토치 Dataset과 DataLoader를 만들겠습니다.
class LMDataset(torch.utils.data.Dataset):
def __init__(self, tokens):
super(LMDataset, self).__init__()
self.PAD = 0
self.UNK = 1
self.tokens = tokens
self._getitem(2)
def _getitem(self, index):
# X에는 문장 토큰이 담긴 array, y에는 문장 토큰을 한개씩 당긴 array
# self._getitem(index)를 통해 확인해보시기 바랍니다.
X = self.tokens[index]
y = np.concatenate((X[1:], [self.PAD]))
X = torch.from_numpy(X).unsqueeze(0).long()
y = torch.from_numpy(y).unsqueeze(0).long()
return X, y
def __getitem__(self, index):
X = self.tokens[index]
y = np.concatenate((X[1:], [self.PAD]))
X = torch.from_numpy(X).long()
y = torch.from_numpy(y).long()
return X, y
def __len__(self):
return len(self.tokens)
batch_size = 64
dataset = LMDataset(tokens)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
print(len(dataset))
print(len(dataloader))
4. LSTM Cell을 직접 구현해보도록 하겠습니다. LSTM의 동작방식은 아래의 그림과 같습니다. LSTM Cell 안에는 Forget gate, Input gate, Gate gate, Output gate가 존재합니다. 이 네개의 게이트를 구현하여 LSTM Cell을 구현합니다.
class LSTMCell(nn.Module):
def __init__(self, input_size, hidden_size):
super(LSTMCell, self).__init__()
# 각 gate는 input_data와 이전 시점의 hidden_state의 크기를
# 이번 시점의 hidden_state의 크기로 만듭니다.
# input-gate
self.Wi = nn.Linear(input_size + hidden_size, hidden_size)
# forget-gate
self.Wf = nn.Linear(input_size + hidden_size, hidden_size)
# gate-gate
self.Wg = nn.Linear(input_size + hidden_size, hidden_size)
# output-gate
self.Wo = nn.Linear(input_size + hidden_size, hidden_size)
# non-linearity
self.sigmoid = nn.Sigmoid()
self.tanh = nn.Tanh()
def forward(self, x, h_0, c_0):
"""
Inputs
input (x): [batch_size, input_size]
hidden_state (h_0): [batch_size, hidden_size]
cell_state (c_0): [batch_size, hidden_size]
Outputs
next_hidden_state (h_1): [batch_size, hidden_size]
next_cell_state (c_1): [batch_size, hidden_size]
"""
# h_1, c_1은 현재 hidden state, cell state를 의미하며
# h_0, c_0는 이전 hidden state, cell state를 의미합니다.
h_1, c_1 = None, None
input = torch.cat((x, h_0), 1)
# Implement LSTM cell as noted above
# LSTM의 구성대로 Input gate, forget gate, output gate는 sigmoid
# Gate gate는 tanh를 활성화함수로 택합니다.
i = self.sigmoid(self.Wi(input))
f = self.sigmoid(self.Wf(input))
g = self.tanh(self.Wg(input))
o = self.sigmoid(self.Wo(input))
# input을 통과시킨 i,f,g,o를 통해 c_1, h_1을 계산합니다.
c_1 = f * c_0 + i * g
h_1 = o * self.tanh(c_1)
return h_1, c_1
5. 위에서 만든 LSTM cell을 활용하여 Language Model을 만들겠습니다. 문장이 입력되면 임베딩하여 LSTMCell로 넘어갑니다. LSTMCell에서 연산을 거친 후 다음 step에서 필요한 hidden state와 cell state을 만들고 Loss를 계산하기 위한 output을 만듭니다. 만약, 모델이 학습이 아닌 생성을 위해 사용한다면(predict = True), output을 확률 분포로 만들고, 확률에 따라 뽑힌 sample을 반환합니다.
class LanguageModel(nn.Module):
def __init__(self, input_size=64, hidden_size=64, vocab_size=10000):
super(LanguageModel, self).__init__()
self.input_layer = nn.Embedding(vocab_size, input_size)
self.hidden_layer = LSTMCell(input_size, hidden_size)
self.output_layer = nn.Linear(hidden_size, vocab_size)
def forward(self, x, hx, cx, predict=False):
x = self.input_layer(x)
hx, cx = self.hidden_layer(x, hx, cx)
ox = self.output_layer(hx)
if predict == True:
probs = F.softmax(ox, dim=1)
# torch distribution allows sampling operation
# see https://pytorch.org/docs/stable/distributions.html
dist = torch.distributions.Categorical(probs)
ox = dist.sample()
return ox, hx, cx
6. 위에서 구현한 dataloader와 languege model을 활용하여 모델을 학습하겠습니다. 모델 학습에 필요한 Trainer class를 구현하여 모델학습을 진행하겠습니다. 5epoch마다 학습된 모델을 테스트하여 새로운 sentence를 만들어 학습의 진행 정도를 확인합니다.
class Trainer():
def __init__(self, word2idx, idx2word, dataloader, model, criterion, optimizer, device):
self.word2idx = word2idx
self.idx2word = idx2word
self.dataloader = dataloader
self.model = model
self.criterion = criterion
self.optimizer = optimizer
self.device = device
def train(self, epochs = 1):
self.model.to(self.device)
start_time = time.time()
for epoch in range(epochs):
losses = []
for iter, (x_batch, y_batch) in tqdm.tqdm(enumerate(self.dataloader)):
self.model.train()
batch_size, max_seq_len = x_batch.shape
x_batch = x_batch.to(self.device)
y_batch = y_batch.to(self.device)
# hidden_state와 cell_state는 0으로 생성
hx = torch.zeros(batch_size, hidden_size).to(self.device)
cx = torch.zeros(batch_size, hidden_size).to(self.device)
ox_batch = []
for s_idx in range(max_seq_len):
x = x_batch[:, s_idx]
ox, hx, cx = self.model(x, hx, cx)
ox_batch.append(ox)
# outputs are ordered by the time sequence
ox_batch = torch.cat(ox_batch).reshape(max_seq_len, batch_size, -1)
ox_batch = ox_batch.permute(1,0,2).reshape(batch_size*max_seq_len, -1)
y_batch = y_batch.reshape(-1)
self.model.zero_grad()
loss = self.criterion(ox_batch, y_batch)
loss.backward()
self.optimizer.step()
losses.append(loss.item())
end_time = time.time() - start_time
end_time = str(datetime.timedelta(seconds=end_time))[:-7]
print('Time [%s], Epoch [%d/%d], loss: %.4f'
% (end_time, epoch+1, epochs, np.mean(losses)))
if epoch % 5 == 0:
generated_sentences = self.test()
print('[Generated Sentences]')
for sentence in generated_sentences:
print(sentence)
def test(self):
# Test model to genereate the sentences
self.model.eval()
num_sentence = 5
max_seq_len = 50
# initial hidden-states
outs = []
x = torch.randint(0, 10000, (num_sentence,)).to(self.device)
hx = torch.zeros(num_sentence, hidden_size).to(self.device)
cx = torch.zeros(num_sentence, hidden_size).to(self.device)
outs.append(x)
with torch.no_grad():
for s_idx in range(max_seq_len-1):
x, hx, cx = self.model(x, hx, cx, predict=True)
outs.append(x)
outs = torch.cat(outs).reshape(max_seq_len, num_sentence)
outs = outs.permute(1, 0)
outs = outs.detach().cpu().numpy()
sentences = []
for out in outs:
sentence = []
for token_idx in out:
word = self.idx2word[token_idx]
sentence.append(word)
sentences.append(sentence)
return sentences
lr = 1e-2
input_size = 128
hidden_size = 128
batch_size = 256
dataset = LMDataset(tokens)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
model = LanguageModel(input_size=input_size, hidden_size=hidden_size)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=lr)
device = torch.device('cuda')
trainer = Trainer(word2idx = word2idx,
idx2word = idx2word,
dataloader=dataloader,
model = model,
criterion=criterion,
optimizer = optimizer,
device=device)
trainer.train(epochs=1)