深度学习之 rnn 台词生成
写一个台词生成的程序,用 pytorch 写的。
import os
def load_data(path):
with open(path, 'r', encoding="utf-8") as f:
data = f.read()
return data
text = load_data('./moes_tavern_lines.txt')[81:]
train_count = int(len(text) * 0.6)
val_count = int(len(text) * 0.2)
test_count = int(len(text) * 0.2)
train_text = text[:train_count]
val_text = text[train_count: train_count + val_count]
test_text = text[train_count + val_count:]
view_sentence_range = (0, 10)
import numpy as np
print("data set State")
print("Roughly the number of unique words: {}".format(len({word: None for word in text.split()})))
scenes = text.split("\n\n")
print("number of scenes: {}".format(len(scenes)))
sentence_count_scene = [scene.count('\n') for scene in scenes]
print('Average number for sentences in each scene: {}'.format(np.average(sentence_count_scene)))
sentences = [sentence for scene in scenes for sentence in scene.split('\n')]
print("Number for lines: {}".format(len(sentences)))
word_count_sentence = [len(sentence.split()) for sentence in sentences]
print('Average number for words in each line: {}'.format(np.average(word_count_sentence)))
print()
print('The sentences {} to {}:'.format(*view_sentence_range))
print('\n'.join(text.split('\n')[view_sentence_range[0]:view_sentence_range[1]]))
def token_lookup():
return {
'.': '||Period||',
',': '||Comma||',
'"': '||Quotation_Mark||',
';': '||Semicolon||',
'!': '||Exclamation_mark||',
'?': '||Question_mark||',
'(': '||Left_Parentheses||',
')': '||Right_Parentheses||',
'--': '||Dash||',
'\n': '||Return||',
}
import os
import torch
class Dictionary(object):
def __init__(self):
self.word2idx = {}
self.idx2word = []
def add_word(self, word):
if word not in self.word2idx:
self.idx2word.append(word)
self.word2idx[word] = len(self.idx2word) - 1
return self.word2idx[word]
def __len__(self):
return len(self.idx2word)
class Corpus(object):
def __init__(self, train, val, test):
self.dictionary = Dictionary()
self.train = self.tokenize(train)
self.valid = self.tokenize(val)
self.test = self.tokenize(test)
def tokenize(self, text):
words = text.split()
tokens = len(words)
token = 0
ids = torch.LongTensor(tokens)
for i, word in enumerate(words):
self.dictionary.add_word(word)
ids[i] = self.dictionary.word2idx[word]
return ids
import numpy as np
import torch
i_dict = token_lookup()
def create_data(text):
vocab_to_int = {}
int_to_vocab = {}
new_text = ""
for t in text:
if t in token_lookup():
new_text += " {} ".format(i_dict[t])
else:
new_text += t
return new_text
import torch
import torch.nn as nn
from torch.autograd import Variable
# 模型 RNN
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, n_layers=1):
super(RNN, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.n_layers = n_layers
self.drop = nn.Dropout(0.5)
self.encoder = nn.Embedding(input_size, hidden_size)
self.gru = nn.GRU(hidden_size, hidden_size, n_layers)
self.decoder = nn.Linear(hidden_size, output_size)
def forward(self, input, hidden):
input = self.encoder(input)
output, hidden = self.gru(input, hidden)
output = self.drop(output)
decoded = self.decoder(output.view(output.size(0) * output.size(1), output.size(2)))
return decoded.view(output.size(0), output.size(1), decoded.size(1)), hidden
def init_hidden(self, batch_size):
return Variable(torch.zeros(self.n_layers, batch_size, self.hidden_size))
# batch 化
def batchify(data, bsz):
# Work out how cleanly we can divide the dataset into bsz parts.
nbatch = data.size(0) // bsz
# Trim off any extra elements that wouldn't cleanly fit (remainders).
data = data.narrow(0, 0, nbatch * bsz)
# Evenly divide the data across the bsz batches.
data = data.view(bsz, -1).t().contiguous()
return data
n_epochs = 3500
print_every = 500
plot_every = 10
hidden_size = 100
n_layers = 1
lr = 0.005
chunk_len = 10
batch_size = 20
val_batch_size = 10
# 数据生成
train_data = create_data(train_text)
test_data = create_data(test_text)
val_data = create_data(val_text)
corpus = Corpus(train_data, val_data, test_data)
train_source = batchify(corpus.train, batch_size)
test_source = batchify(corpus.test, batch_size)
val_source = batchify(corpus.valid, batch_size)
n_tokens = len(corpus.dictionary)
# 模型
model = RNN(n_tokens, hidden_size, n_tokens, n_layers)
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# 损失函数
criterion = nn.CrossEntropyLoss()
#
def get_batch(source, i , evaluation = False):
seq_len = min(chunk_len, len(source) - 1 - i)
data = Variable(source[i:i+seq_len], volatile=evaluation)
target = Variable(source[i+1:i+1+seq_len].view(-1))
return data,target
def repackage_hidden(h):
if type(h) == Variable:
return Variable(h.data)
else:
return tuple(repackage_hidden(v) for v in h)
# 训练
def train():
model.train()
total_loss = 0
ntokens = len(corpus.dictionary)
hidden = model.init_hidden(batch_size)
for batch, i in enumerate(range(0, train_source.size(0) - 1, chunk_len)):
data, targets = get_batch(train_source, i)
hidden = repackage_hidden(hidden)
optimizer.zero_grad()
output, hidden = model(data, hidden)
loss = criterion(output.view(-1, ntokens), targets)
loss.backward()
optimizer.step()
total_loss += loss.data
if batch % 10 == 0:
print('epoch {}/{} {}'.format(epoch, batch, loss.data))
# 验证
def evaluate(data_source):
model.eval()
total_loss = 0
ntokens = len(corpus.dictionary)
hidden = model.init_hidden(batch_size)
for i in range(0, data_source.size(0) - 1, chunk_len):
data, targets = get_batch(data_source, i, evaluation=True)
output, hidden = model(data, hidden)
output_flat = output.view(-1, ntokens)
total_loss += len(data) * criterion(output_flat, targets).data
hidden = repackage_hidden(hidden)
return total_loss[0] / len(data_source)
import time, math
# 开始训练
for epoch in range(1, n_epochs + 1):
train()
val_loss = evaluate(val_source)
print("epoch {} {} {}".format(epoch, val_loss, math.exp(val_loss)))
# 生成一段短语
def gen(n_words):
model.eval()
ntokens = len(corpus.dictionary)
hidden = model.init_hidden(1)
input = Variable(torch.rand(1, 1).mul(ntokens).long(), volatile=True)
words = []
for i in range(n_words):
output, hidden = model(input, hidden)
word_weights = output.squeeze().data.exp().cpu()
word_idx = torch.multinomial(word_weights, 1)[0]
input.data.fill_(word_idx)
word = corpus.dictionary.idx2word[word_idx]
isOk = False
for w,s in i_dict.items():
if s == word:
isOk = True
words.append(w)
break
if not isOk:
words.append(word)
return words
words = gen(1000)
print(" ".join(words))
总结
rnn 总是参数不怎么对,耐心调整即可。