def generate_(self, inputs, mode='display', return_attend=False, return_all=False):
# assert self.config['sample_stoch'], 'RNNLM sampling must be stochastic'
# assert not self.config['sample_argmax'], 'RNNLM sampling cannot use argmax'
args = dict(k=self.config['sample_beam'],
maxlen=self.config['max_len'],
stochastic=self.config['sample_stoch'] if mode == 'display' else None,
argmax=self.config['sample_argmax'] if mode == 'display' else None,
return_attend=return_attend)
context, _, c_mask, _, Z, R = self.encoder.gtenc(inputs)
# c_mask[0, 3] = c_mask[0, 3] * 0
# L = context.shape[1]
# izz = np.concatenate([np.arange(3), np.asarray([1,2]), np.arange(3, L)])
# context = context[:, izz, :]
# c_mask = c_mask[:, izz]
# inputs = inputs[:, izz]
# context, _, c_mask, _ = self.encoder.encode(inputs)
# import pylab as plt
# # visualize_(plt.subplots(), Z[0][:, 300:], normal=False)
# visualize_(plt.subplots(), context[0], normal=False)
if 'explicit_loc' in self.config:
if self.config['explicit_loc']:
max_len = context.shape[1]
expLoc = np.eye(max_len, self.config['encode_max_len'], dtype='float32')[None, :, :]
expLoc = np.repeat(expLoc, context.shape[0], axis=0)
context = np.concatenate([context, expLoc], axis=2)
sample, score, ppp, _ = self.decoder.get_sample(context, c_mask, inputs, **args)
if return_all:
return sample, score, ppp
if not args['stochastic']:
score = score / np.array([len(s) for s in sample])
idz = score.argmin()
sample = sample[idz]
score = score.min()
ppp = ppp[idz]
else:
score /= float(len(sample))
return sample, np.exp(score), ppp
python类subplots()的实例源码
def generate_multiple(self, inputs, mode='display', return_attend=False, return_all=True, return_encoding=False):
# assert self.config['sample_stoch'], 'RNNLM sampling must be stochastic'
# assert not self.config['sample_argmax'], 'RNNLM sampling cannot use argmax'
args = dict(k=self.config['sample_beam'],
maxlen=self.config['max_len'],
stochastic=self.config['sample_stoch'] if mode == 'display' else None,
argmax=self.config['sample_argmax'] if mode == 'display' else None,
return_attend=return_attend,
type=self.config['predict_type']
)
'''
Return the encoding of input.
Similar to encoder.encode(), but gate values are returned as well
I think only gtenc with attention
default: with_context=False, return_sequence=True, return_embed=True
'''
"""
return
context: a list of vectors [nb_sample, max_len, 2*enc_hidden_dim], encoding of each time state (concatenate both forward and backward RNN)
_: embedding of text X [nb_sample, max_len, enc_embedd_dim]
c_mask: mask, an array showing which elements in context are not 0 [nb_sample, max_len]
_: encoding of end of X, seems not make sense for bidirectional model (head+tail) [nb_sample, 2*enc_hidden_dim]
Z: value of update gate, shape=(nb_sample, 1)
R: value of update gate, shape=(nb_sample, 1)
but.. Z and R are not used here
"""
context, _, c_mask, _, Z, R = self.encoder.gtenc(inputs)
# c_mask[0, 3] = c_mask[0, 3] * 0
# L = context.shape[1]
# izz = np.concatenate([np.arange(3), np.asarray([1,2]), np.arange(3, L)])
# context = context[:, izz, :]
# c_mask = c_mask[:, izz]
# inputs = inputs[:, izz]
# context, _, c_mask, _ = self.encoder.encode(inputs)
# import pylab as plt
# # visualize_(plt.subplots(), Z[0][:, 300:], normal=False)
# visualize_(plt.subplots(), context[0], normal=False)
if 'explicit_loc' in self.config: # no
if self.config['explicit_loc']:
max_len = context.shape[1]
expLoc = np.eye(max_len, self.config['encode_max_len'], dtype='float32')[None, :, :]
expLoc = np.repeat(expLoc, context.shape[0], axis=0)
context = np.concatenate([context, expLoc], axis=2)
sample, score, ppp, output_encoding = self.decoder.get_sample(context, c_mask, inputs, **args)
if return_all:
if return_encoding:
return context, sample, score, output_encoding
else:
return sample, score
return sample, score
def analyse_(self, inputs, outputs, idx2word, inputs_unk=None, return_attend=False, name=None, display=False):
def cut_zero(sample, idx2word, ppp=None, Lmax=None):
if Lmax is None:
Lmax = self.config['dec_voc_size']
if ppp is None:
if 0 not in sample:
return ['{}'.format(idx2word[w].encode('utf-8'))
if w < Lmax else '{}'.format(idx2word[inputs[w - Lmax]].encode('utf-8'))
for w in sample]
return ['{}'.format(idx2word[w].encode('utf-8'))
if w < Lmax else '{}'.format(idx2word[inputs[w - Lmax]].encode('utf-8'))
for w in sample[:sample.index(0)]]
else:
if 0 not in sample:
return ['{0} ({1:1.1f})'.format(
idx2word[w].encode('utf-8'), p)
if w < Lmax
else '{0} ({1:1.1f})'.format(
idx2word[inputs[w - Lmax]].encode('utf-8'), p)
for w, p in zip(sample, ppp)]
idz = sample.index(0)
return ['{0} ({1:1.1f})'.format(
idx2word[w].encode('utf-8'), p)
if w < Lmax
else '{0} ({1:1.1f})'.format(
idx2word[inputs[w - Lmax]].encode('utf-8'), p)
for w, p in zip(sample[:idz], ppp[:idz])]
if inputs_unk is None:
result, _, ppp = self.generate_(inputs[None, :],
return_attend=return_attend)
else:
result, _, ppp = self.generate_(inputs_unk[None, :],
return_attend=return_attend)
source = '{}'.format(' '.join(cut_zero(inputs.tolist(), idx2word, Lmax=len(idx2word))))
target = '{}'.format(' '.join(cut_zero(outputs.tolist(), idx2word, Lmax=len(idx2word))))
decode = '{}'.format(' '.join(cut_zero(result, idx2word)))
if display:
print(source)
print(target)
print(decode)
idz = result.index(0)
p1, p2 = [np.asarray(p) for p in zip(*ppp)]
print(p1.shape)
import pylab as plt
# plt.rc('text', usetex=True)
# plt.rc('font', family='serif')
visualize_(plt.subplots(), 1 - p1[:idz, :].T, grid=True, name=name)
visualize_(plt.subplots(), 1 - p2[:idz, :].T, name=name)
# visualize_(plt.subplots(), 1 - np.mean(p2[:idz, :], axis=1, keepdims=True).T)
return target == decode
def plot_rels(data, labels=None, colors=None, outfile="rels", latent=None, alpha=0.8):
ns, n = data.shape
if labels is None:
labels = list(map(str, range(n)))
ncol = 5
# ncol = 4
nrow = int(np.ceil(float(n * (n - 1) / 2) / ncol))
#nrow=1
#pylab.rcParams.update({'figure.autolayout': True})
fig, axs = pylab.subplots(nrow, ncol)
fig.set_size_inches(5 * ncol, 5 * nrow)
#fig.set_canvas(pylab.gcf().canvas)
pairs = list(combinations(range(n), 2)) #[:4]
pairs = sorted(pairs, key=lambda q: q[0]**2+q[1]**2) # Puts stronger relationships first
if colors is not None:
colors = (colors - np.min(colors)) / (np.max(colors) - np.min(colors)).clip(1e-7)
for ax, pair in zip(axs.flat, pairs):
if latent is None:
ax.scatter(data[:, pair[0]], data[:, pair[1]], marker='.', edgecolors='none', alpha=alpha)
else:
# cs = 'rgbcmykrgbcmyk'
markers = 'x+.o,<>^^<>,+x.'
for j, ind in enumerate(np.unique(latent)):
inds = (latent == ind)
ax.scatter(data[inds, pair[0]], data[inds, pair[1]], c=colors[inds], cmap=pylab.get_cmap("jet"),
marker=markers[j], alpha=0.5, edgecolors='none', vmin=0, vmax=1)
ax.set_xlabel(shorten(labels[pair[0]]))
ax.set_ylabel(shorten(labels[pair[1]]))
for ax in axs.flat[axs.size - 1:len(pairs) - 1:-1]:
ax.scatter(data[:, 0], data[:, 1], marker='.')
pylab.rcParams['font.size'] = 12 #6
pylab.draw()
#fig.set_tight_layout(True)
fig.tight_layout()
for ax in axs.flat[axs.size - 1:len(pairs) - 1:-1]:
ax.set_visible(False)
filename = outfile + '.png'
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
fig.savefig(outfile + '.png') #df')
pylab.close('all')
return True
def make_mutual_info_plot(fn):
M.rcParams.update({'font.size': 11})
angleList = N.array([f/f.max() for f in read.extract_arr_from_h5(fn, "/history/angle", n=-1)])
mutualInfoList = read.extract_arr_from_h5(fn, "/history/mutual_info", n=-1)
quatList = read.extract_arr_from_h5(fn, "/history/quaternion", n=-1)
quatSwitchPos = N.where(quatList[:-1]-quatList[1:] != 0)[0] + 1
angsort = N.argsort(angleList[-1])
misort = N.argsort(mutualInfoList.mean(axis=0))
blkPositions = [0] + list(quatSwitchPos) + [-1]
for bp in range(len(blkPositions)-1):
(start, end) = (blkPositions[bp], blkPositions[bp+1])
curr_blk = angleList[start:end]
curr_blk2 = mutualInfoList[start:end] / N.log(quatSize[quatList[0]])
# curr_blk2 = mutualInfoList[start:end] / N.log(quatSize[quatList[bp]])
if len(curr_blk) == 0:
pass
else:
angsort = N.argsort(curr_blk[-1])
angleList[start:end] = curr_blk[:,angsort]
for n,l in enumerate(curr_blk2):
misort = N.argsort(l)
mutualInfoList[start+n] = l[misort]
P.ioff()
fig, ax = P.subplots(2, 1, sharex=True, figsize=(7, 10))
fig.subplots_adjust(hspace=0.1)
im0 = ax[0].imshow(angleList.transpose(), aspect='auto', interpolation=None, cmap=P.cm.OrRd)
ax[0].set_xlabel("iteration")
ax[0].set_ylabel("each pattern's most likely orientation\n(sorted by final orientation in each block)")
(e_min, e_max) = (1, len(angleList[0]))
e_int = 0.1*(e_max-e_min)
ax[0].plot([0, 0], [e_min, e_max], 'k-')
ax[0].text(1, e_max-e_int, "quat%d"%quatList[0], size=8, rotation=-0, ha='left', va='center', color='w', bbox=dict(boxstyle="larrow,pad=0.1",facecolor='0.1') )
for n,qs in enumerate(quatSwitchPos):
ax[0].plot([qs, qs], [e_min, e_max], 'k-')
ax[0].text(qs-1, e_max+(-n-1)*e_int, "quat%d"%quatList[qs], size=8, rotation=-0, ha='right', va='center', color='w', bbox=dict(boxstyle="rarrow,pad=0.1",facecolor='0.1') )
div0 = make_axes_locatable(ax[0])
cax0 = div0.append_axes("right", size="5%", pad=0.05)
cbar0 = P.colorbar(im0, cax=cax0)
ax[0].set_ylim(e_min, e_max)
ax[0].set_xlim(0, len(angleList)-1)
(e_min, e_max) = (1, len(mutualInfoList[0]))
e_int = 0.1*(e_max-e_min)
im1 = ax[1].imshow(mutualInfoList.transpose(), vmax=.2, aspect='auto', cmap=P.cm.YlGnBu)
ax[1].set_xlabel("iteration")
ax[1].set_ylabel("average mutual-information per dataset\n(sorted by average information)")
ax[1].plot([0, 0], [e_min, e_max], 'k-')
ax[1].text(1, e_max-e_int, "quat%d"%quatList[0], size=8, rotation=-0, ha='left', va='center', color='w', bbox=dict(boxstyle="larrow,pad=0.1",facecolor='0.1') )
for n,qs in enumerate(quatSwitchPos):
ax[1].plot([qs, qs], [e_min, e_max], 'k-')
ax[1].text(qs-1, e_max+(-n-1)*e_int, "quat%d"%quatList[qs], size=8, rotation=-0, ha='right', va='center', color='w', bbox=dict(boxstyle="rarrow,pad=0.1",facecolor='0.1') )
div1 = make_axes_locatable(ax[1])
cax1 = div1.append_axes("right", size="5%", pad=0.05)
cbar1 = P.colorbar(im1, cax=cax1)
ax[1].set_ylim(e_min, e_max)
ax[1].set_xlim(0, len(mutualInfoList)-1)
img_name = "mutual_info_plot.pdf"
P.savefig(img_name, bbox_inches='tight')
print("Image has been saved as %s" % img_name)
P.close(fig)
def generate_(self, inputs, mode='display', return_attend=False, return_all=False):
# assert self.config['sample_stoch'], 'RNNLM sampling must be stochastic'
# assert not self.config['sample_argmax'], 'RNNLM sampling cannot use argmax'
args = dict(k=self.config['sample_beam'],
maxlen=self.config['max_len'],
stochastic=self.config['sample_stoch'] if mode == 'display' else None,
argmax=self.config['sample_argmax'] if mode == 'display' else None,
return_attend=return_attend)
context, _, c_mask, _, Z, R = self.encoder.gtenc(inputs)
# c_mask[0, 3] = c_mask[0, 3] * 0
# L = context.shape[1]
# izz = np.concatenate([np.arange(3), np.asarray([1,2]), np.arange(3, L)])
# context = context[:, izz, :]
# c_mask = c_mask[:, izz]
# inputs = inputs[:, izz]
# context, _, c_mask, _ = self.encoder.encode(inputs)
# import pylab as plt
# # visualize_(plt.subplots(), Z[0][:, 300:], normal=False)
# visualize_(plt.subplots(), context[0], normal=False)
if 'explicit_loc' in self.config:
if self.config['explicit_loc']:
max_len = context.shape[1]
expLoc = np.eye(max_len, self.config['encode_max_len'], dtype='float32')[None, :, :]
expLoc = np.repeat(expLoc, context.shape[0], axis=0)
context = np.concatenate([context, expLoc], axis=2)
sample, score, ppp = self.decoder.get_sample(context, c_mask, inputs, **args)
if return_all:
return sample, score, ppp
if not args['stochastic']:
score = score / np.array([len(s) for s in sample])
idz = score.argmin()
sample = sample[idz]
score = score.min()
ppp = ppp[idz]
else:
score /= float(len(sample))
return sample, np.exp(score), ppp
def analyse_(self, inputs, outputs, idx2word, inputs_unk=None, return_attend=False, name=None, display=False):
def cut_zero(sample, idx2word, ppp=None, Lmax=None):
if Lmax is None:
Lmax = self.config['dec_voc_size']
if ppp is None:
if 0 not in sample:
return ['{}'.format(idx2word[w].encode('utf-8'))
if w < Lmax else '{}'.format(idx2word[inputs[w - Lmax]].encode('utf-8'))
for w in sample]
return ['{}'.format(idx2word[w].encode('utf-8'))
if w < Lmax else '{}'.format(idx2word[inputs[w - Lmax]].encode('utf-8'))
for w in sample[:sample.index(0)]]
else:
if 0 not in sample:
return ['{0} ({1:1.1f})'.format(
idx2word[w].encode('utf-8'), p)
if w < Lmax
else '{0} ({1:1.1f})'.format(
idx2word[inputs[w - Lmax]].encode('utf-8'), p)
for w, p in zip(sample, ppp)]
idz = sample.index(0)
return ['{0} ({1:1.1f})'.format(
idx2word[w].encode('utf-8'), p)
if w < Lmax
else '{0} ({1:1.1f})'.format(
idx2word[inputs[w - Lmax]].encode('utf-8'), p)
for w, p in zip(sample[:idz], ppp[:idz])]
if inputs_unk is None:
result, _, ppp = self.generate_(inputs[None, :],
return_attend=return_attend)
else:
result, _, ppp = self.generate_(inputs_unk[None, :],
return_attend=return_attend)
source = '{}'.format(' '.join(cut_zero(inputs.tolist(), idx2word, Lmax=len(idx2word))))
target = '{}'.format(' '.join(cut_zero(outputs.tolist(), idx2word, Lmax=len(idx2word))))
decode = '{}'.format(' '.join(cut_zero(result, idx2word)))
if display:
print source
print target
print decode
idz = result.index(0)
p1, p2 = [np.asarray(p) for p in zip(*ppp)]
print p1.shape
import pylab as plt
# plt.rc('text', usetex=True)
# plt.rc('font', family='serif')
visualize_(plt.subplots(), 1 - p1[:idz, :].T, grid=True, name=name)
visualize_(plt.subplots(), 1 - p2[:idz, :].T, name=name)
# visualize_(plt.subplots(), 1 - np.mean(p2[:idz, :], axis=1, keepdims=True).T)
return target == decode