def preprocess_input_sequences(self, data, shuffle=True):
"""
??????
shuffle
PAD/TRUNC????????
y_true????self.A_len????index=0??????one-hot??
"""
documents, questions, answer, candidates = self.union_shuffle(data) if shuffle else data
d_lens = [len(i) for i in documents]
questions_ok = pad_sequences(questions, maxlen=self.q_len, dtype="int32", padding="post", truncating="post")
documents_ok = pad_sequences(documents, maxlen=self.d_len, dtype="int32", padding="post", truncating="post")
context_mask = K.eval(tf.sequence_mask(d_lens, self.d_len, dtype=tf.float32))
candidates_ok = pad_sequences(candidates, maxlen=self.A_len, dtype="int32", padding="post", truncating="post")
y_true = np.zeros_like(candidates_ok)
y_true[:, 0] = 1
return questions_ok, documents_ok, context_mask, candidates_ok, y_true
python类pad_sequences()的实例源码
def prepare_split_vec_dataset(dataset, word_index, padding = True, prem_len = None, hypo_len = None):
P = []
H = []
y = []
for example in dataset:
if example[2] == '-':
continue
P.append(load_word_indices(example[0], word_index))
H.append(load_word_indices(example[1], word_index))
y.append(LABEL_LIST.index(example[2]))
one_hot_y = np.zeros((len(y), len(LABEL_LIST)))
one_hot_y[np.arange(len(y)), y] = 1
if pad_sequences:
P = pad_sequences(P, prem_len, padding='pre')
H = pad_sequences(H, hypo_len, padding='post')
return np.array(P), np.array(H), one_hot_y
def next_batch(self):
inverse_vocabulary = self.inverse_vocabulary
if self.stream:
q = [[inverse_vocabulary[word] for word in next(self.questions).strip().split() ] for i in range(self.batch_size)]
a = [[inverse_vocabulary[word] for word in next(self.answers).strip().split() ] for i in range(self.batch_size)]
else:
n_example = len(self.answers)
indices = random.randint(0, n_example, size=(self.batch_size))
q = [[inverse_vocabulary[word] for word in self.questions[i].split()] for i in indices]
a = [[inverse_vocabulary[word] for word in self.answers[i].split()] for i in indices]
X = pad_sequences(q, maxlen=self.sequence_length)
y = pad_sequences(a, maxlen=self.sequence_length)
if self.one_hot_target:
return (X, self.to_one_hot(y))
else:
return (X, y)
def generate_sentence_batch(sents, word2id, max_seqlen, batch_size):
while True:
# loop once per epoch
# shuffle the input
indices = np.random.permutation(np.arange(len(sents)))
shuffled_sents = [sents[ix] for ix in indices]
# convert to list of list of word id
sent_wids = [[word2id[word] for word in sent.split()]
for sent in shuffled_sents]
num_batches = len(shuffled_sents) // batch_size
for bid in range(num_batches):
# loop once per batch
sents_batch = sent_wids[bid * batch_size : (bid + 1) * batch_size]
sents_batch_padded = sequence.pad_sequences(sents_batch, max_seqlen)
yield sents_batch_padded, sents_batch_padded
############################ main ###############################
def test(self, sentence, model, words):
"""
test only a sentence
:param sentence: a sentence, if ischar==False, the sentence should be segmented
:param model: cnn model
:param words: words list
:return:
"""
if self.ischar is True:
sentence = list(sentence)
else:
sentence = sentence.split()
x_test = [[words[w] for w in sentence if words.has_key(w)]]
x_test = sequence.pad_sequences(x_test, maxlen=self.maxlen)
pred_y = model.predict(x_test)
return pred_y
def test_pad_sequences():
a = [[1], [1, 2], [1, 2, 3]]
# test padding
b = pad_sequences(a, maxlen=3, padding='pre')
assert_allclose(b, [[0, 0, 1], [0, 1, 2], [1, 2, 3]])
b = pad_sequences(a, maxlen=3, padding='post')
assert_allclose(b, [[1, 0, 0], [1, 2, 0], [1, 2, 3]])
# test truncating
b = pad_sequences(a, maxlen=2, truncating='pre')
assert_allclose(b, [[0, 1], [1, 2], [2, 3]])
b = pad_sequences(a, maxlen=2, truncating='post')
assert_allclose(b, [[0, 1], [1, 2], [1, 2]])
# test value
b = pad_sequences(a, maxlen=3, value=1)
assert_allclose(b, [[1, 1, 1], [1, 1, 2], [1, 2, 3]])
def train(self, X_train, V, seed):
X_train = sequence.pad_sequences(X_train, maxlen=self.max_len)
np.random.seed(seed)
X_train = np.random.permutation(X_train)
np.random.seed(seed)
V = np.random.permutation(V)
print("Train...CNN module")
#history = self.model.fit({'input': X_train, 'output': V},
# verbose=0, batch_size=self.batch_size, nb_epoch=self.nb_epoch, shuffle=True, validation_split=0.1, callbacks=[EarlyStopping(monitor='val_loss', patience=0)])
history = self.model.fit(X_train,y=V,batch_size=self.batch_size,nb_epoch=self.nb_epoch, shuffle=True, validation_split=0.1, callbacks=[EarlyStopping(monitor='val_loss', patience=0)])
cnn_loss_his = history.history['loss']
cmp_cnn_loss = sorted(cnn_loss_his)[::-1]
if cnn_loss_his != cmp_cnn_loss:
self.nb_epoch = 1
return history
def vectorize_ques(data, word_id, test_max_length, ques_max_length):
X = []
Xq = []
for subtext, question in data:
x = [word_id[w] for w in subtext]
xq = [word_id[w] for w in question]
# let's not forget that index 0 is reserved
X.append(x)
Xq.append(xq)
return (pad_sequences(X, maxlen=test_max_length),
pad_sequences(Xq, maxlen=ques_max_length))
# Vectorize the text
# Convert Subtext, Questions, Answers to Vector Form
# Y: array[] of zero's with "1" corresponding to word representing correct answer
def vectorize_text(data, word_id, text_max_length, ques_max_length):
X = []
Xq = []
Y = []
for subtext, question, answer in data:
x = [word_id[w] for w in subtext]
# Save the ID of Questions using SubText
xq = [word_id[w] for w in question]
# Save the answers for the Questions in "Y" as "1"
y = np.zeros(len(word_id) + 1)
y[word_id[answer]] = 1
X.append(x)
Xq.append(xq)
Y.append(y)
return (pad_sequences(X, maxlen=text_max_length),
pad_sequences(Xq, maxlen=ques_max_length),
np.array(Y))
# Read the text files
def create_train_and_test(self, examples):
d = [[], []]
for i, s, dep in examples:
d[i].append((i, s, dep))
random.seed(1)
random.shuffle(d[0])
random.shuffle(d[1])
if self.equalize_classes:
l = min(len(d[0]), len(d[1]))
examples = d[0][:l] + d[1][:l]
else:
examples = d[0] + d[1]
random.shuffle(examples)
Y, X, deps = zip(*examples)
Y = np.asarray(Y)
X = sequence.pad_sequences(X, maxlen=self.maxlen)
n_train = int(self.prop_train * len(X))
self.X_train, self.Y_train = X[:n_train], Y[:n_train]
self.X_test, self.Y_test = X[n_train:], Y[n_train:]
self.deps_train = deps[:n_train]
self.deps_test = deps[n_train:]
def __init__(self, widths, vocab_size=5000):
from keras.models import Sequential
from keras.layers import Embedding, Dense, TimeDistributedMerge
from keras.layers.advanced_activations import ELU
from keras.preprocessing.sequence import pad_sequences
from keras.optimizers import SGD
self.n_classes = widths[-1]
self.vocab_size = vocab_size
self.word_to_int = {}
self.int_to_word = np.ndarray(shape=(vocab_size+1,), dtype='int64')
self.model = Sequential()
self.model.add(Embedding(vocab_size, widths[0]))
self.model.add(TimeDistributedMerge(mode='ave'))
for width in widths[1:-1]:
layer = Dense(output_dim=hidden_width, init='he_normal', activation=ELU(1.0))
self.model.add(layer)
self.model.add(
Dense(
n_classes,
init='zero',
activation='softmax'))
sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
self.model.compile(loss='categorical_crossentropy', optimizer=sgd)
def subj_run(index_embedding, dataset, num_words=5000, embedding_len=100, max_len=50):
(x_train, y_train), (x_test, y_test) = ds.load_data(dataset, num_words)
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
model = Sequential()
model.add(Embedding(num_words, embedding_len, input_length=max_len, weights=[index_embedding]))
model.add(LSTM(max_len, dropout=0.5, recurrent_dropout=0.5))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
print(model.summary())
model.fit(x_train, y_train, epochs=4, batch_size=50, verbose=2)
score, acc = model.evaluate(x_test, y_test, verbose=0)
print('Test score:', score)
print('Test accuracy:', acc)
def imdb_run(index_embedding, dataset, num_words=5000, embedding_len=100, max_len=500):
(x_train, y_train), (x_test, y_test) = ds.load_data(dataset, num_words)
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
model = Sequential()
model.add(Embedding(num_words, embedding_len, input_length=max_len, weights=[index_embedding]))
model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
print(model.summary())
model.fit(x_train, y_train, epochs=3, batch_size=64, verbose=2)
score, acc = model.evaluate(x_test, y_test, verbose=0)
print('Test score:', score)
print('Test accuracy:', acc)
def fit(self, X_train, y_train, X_test, y_test,
batch_size=100, nb_epoch=3, show_accuracy=True):
"""
:param X_train: each instance is a list of word index
:param y_train:
:return:
"""
print(len(X_train), 'train sequences')
print(len(X_test), 'test sequences')
print("Pad sequences (samples x time)")
X_train = sequence.pad_sequences(X_train, maxlen=self.maxlen)
X_test = sequence.pad_sequences(X_test, maxlen=self.maxlen)
print('X_train shape:', X_train.shape)
print('X_test shape:', X_test.shape)
y_train = expand_label(y_train)
y_test = expand_label(y_test)
self.model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch,
show_accuracy=True, validation_data=(X_test, y_test))
def fit(self, X_train, y_train, X_test, y_test,
batch_size=50, nb_epoch=3):
"""
:param X_train: each instance is a list of word index
:param y_train:
:return:
"""
print(len(X_train), 'train sequences')
print(len(X_test), 'test sequences')
print("Pad sequences (samples x time)")
X_train = sequence.pad_sequences(X_train, maxlen=self.maxlen)
X_test = sequence.pad_sequences(X_test, maxlen=self.maxlen)
print('X_train shape:', X_train.shape)
print('X_test shape:', X_test.shape)
y_train = expand_label(y_train)
y_test = expand_label(y_test)
#early stopping
early_stop = EarlyStopping(monitor='val_loss', patience=2)
self.model.fit({'input': X_train, 'output': y_train}, batch_size=batch_size, nb_epoch=nb_epoch,
verbose=1, validation_data=({'input': X_test, 'output': y_test}), callbacks=[early_stop])
def get_questions_matrix(split):
if split == 'train':
data_path = 'data/train_qa'
elif split == 'val':
data_path = 'data/val_qa'
else:
print('Invalid split!')
sys.exit()
df = pd.read_pickle(data_path)
questions = df[['question']].values.tolist()
word_idx = ebd.load_idx()
seq_list = []
for question in questions:
words = word_tokenize(question[0])
seq = []
for word in words:
seq.append(word_idx.get(word,0))
seq_list.append(seq)
question_matrix = pad_sequences(seq_list)
return question_matrix
def test_pad_sequences():
a = [[1], [1, 2], [1, 2, 3]]
# test padding
b = pad_sequences(a, maxlen=3, padding='pre')
assert_allclose(b, [[0, 0, 1], [0, 1, 2], [1, 2, 3]])
b = pad_sequences(a, maxlen=3, padding='post')
assert_allclose(b, [[1, 0, 0], [1, 2, 0], [1, 2, 3]])
# test truncating
b = pad_sequences(a, maxlen=2, truncating='pre')
assert_allclose(b, [[0, 1], [1, 2], [2, 3]])
b = pad_sequences(a, maxlen=2, truncating='post')
assert_allclose(b, [[0, 1], [1, 2], [1, 2]])
# test value
b = pad_sequences(a, maxlen=3, value=1)
assert_allclose(b, [[1, 1, 1], [1, 1, 2], [1, 2, 3]])
def loadTestData(folderName):
data_train = pd.read_csv(folderName + 'data/test_datum.txt', sep='\t', error_bad_lines=False)
labels = []
for idx in range(data_train.question.shape[0]):
labels.append(data_train.value[idx])
texts_c3 = pickle.load(open(folderName + 'test_lemmas_c', 'rb'))
texts_q3 = pickle.load(open(folderName + 'test_lemmas_q', 'rb'))
texts_a3 = pickle.load(open(folderName + 'test_lemmas_a', 'rb'))
tokenizer = pickle.load(open(folderName + 'structures/tokenizer', 'rb'))
sequences_q = tokenizer.texts_to_sequences(texts_q3)
sequences_a = tokenizer.texts_to_sequences(texts_a3)
sequences_c = tokenizer.texts_to_sequences(texts_c3)
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))
data_q = pad_sequences(sequences_q, maxlen=MAX_SEQUENCE_LENGTH_Q)
data_a = pad_sequences(sequences_a, maxlen=MAX_SEQUENCE_LENGTH_A)
data_c = pad_sequences(sequences_c, maxlen=MAX_SEQUENCE_LENGTH_C)
labels = to_categorical(np.asarray(labels))
print('Shape of label tensor:', labels.shape)
return [data_c, data_q, data_a, labels, data_train]
def build_tensor(filename, numrecs, word2index, maxlen,
make_categorical=False):
data = np.empty((numrecs, ), dtype=list)
fin = open(filename, "rb")
i = 0
for line in fin:
wids = []
for word in line.strip().split():
if word2index.has_key(word):
wids.append(word2index[word])
else:
wids.append(word2index["UNK"])
if make_categorical:
data[i] = np_utils.to_categorical(
wids, num_classes=len(word2index))
else:
data[i] = wids
i += 1
fin.close()
pdata = sequence.pad_sequences(data, maxlen=maxlen)
return pdata
def generate_batch(s_sents, s_word2index, t_sents, t_word2index,
batch_size, maxlen):
while True:
# shuffle the input
indices = np.random.permutation(np.arange(len(s_sents)))
ss_sents = [s_sents[ix] for ix in indices]
ts_sents = [t_sents[ix] for ix in indices]
# convert to word indices
si_sents = [[get_or_else(s_word2index, word, s_word2index["UNK"])
for word in sent]
for sent in ss_sents]
ti_sents = [[t_word2index[word] for word in sent]
for sent in ts_sents]
# inner loop should run for an epoch
num_batches = len(s_sents) // batch_size
for i in range(num_batches):
s_batch = si_sents[i * batch_size : (i + 1) * batch_size]
t_batch = ti_sents[i * batch_size : (i + 1) * batch_size]
sp_batch = sequence.pad_sequences(s_batch, maxlen=maxlen)
tp_batch = sequence.pad_sequences(t_batch, maxlen=maxlen)
tpc_batch = np_utils.to_categorical(tp_batch.reshape(-1, 1),
num_classes=len(t_word2index)).reshape(batch_size,
-1, len(t_word2index))
yield sp_batch, tpc_batch