def __init__(self, in_size, out_size, hidden_sizes, nonlinearity=F.relu,
last_wscale=1):
self.in_size = in_size
self.out_size = out_size
self.hidden_sizes = hidden_sizes
self.nonlinearity = nonlinearity
super().__init__()
with self.init_scope():
if hidden_sizes:
hidden_layers = []
hidden_layers.append(L.Linear(in_size, hidden_sizes[0]))
for hin, hout in zip(hidden_sizes, hidden_sizes[1:]):
hidden_layers.append(L.Linear(hin, hout))
self.hidden_layers = chainer.ChainList(*hidden_layers)
self.output = L.Linear(hidden_sizes[-1], out_size,
initialW=LeCunNormal(last_wscale))
else:
self.output = L.Linear(in_size, out_size,
initialW=LeCunNormal(last_wscale))
python类ChainList()的实例源码
def __init__(self, n_actions, n_input_channels=4,
activation=F.relu, bias=0.1):
self.n_actions = n_actions
self.n_input_channels = n_input_channels
self.activation = activation
super().__init__()
with self.init_scope():
self.conv_layers = chainer.ChainList(
L.Convolution2D(n_input_channels, 32, 8, stride=4,
initial_bias=bias),
L.Convolution2D(32, 64, 4, stride=2, initial_bias=bias),
L.Convolution2D(64, 64, 3, stride=1, initial_bias=bias))
self.a_stream = MLP(3136, n_actions, [512])
self.v_stream = MLP(3136, 1, [512])
def __init__(self, d, f, R, gpu):
self.d = d
self.f = f
self.R = R
self.gpu = gpu
g = ChainList(*[L.Linear(1, f) for i in six.moves.range(AtomIdMax)])
H = ChainList(*[L.Linear(f, f) for i in six.moves.range(R)])
W = ChainList(*[L.Linear(f, d) for i in six.moves.range(R + 1)])
self.optimizer = optimizers.Adam()
self.model = Chain(H=H, W=W, g=g)
if gpu:
self.model.to_gpu(0)
self.optimizer.setup(self.model)
self.to = [[] for i in six.moves.range(2)]
self.atom_sid = [[] for i in six.moves.range(2)]
self.anum = [[] for i in six.moves.range(2)]
def test_addgrads(self):
l1 = chainer.Link(x=(2, 3))
l2 = chainer.Link(x=2)
l3 = chainer.Link(x=3)
c1 = chainer.ChainList(l1, l2)
c2 = chainer.ChainList(c1, l3)
l1.x.grad.fill(1)
l2.x.grad.fill(2)
l3.x.grad.fill(3)
self.l1.x.grad.fill(-1)
self.l2.x.grad.fill(-2)
self.l3.x.grad.fill(-3)
self.c2.addgrads(c2)
numpy.testing.assert_array_equal(self.l1.x.grad, numpy.zeros((2, 3)))
numpy.testing.assert_array_equal(self.l2.x.grad, numpy.zeros(2))
numpy.testing.assert_array_equal(self.l3.x.grad, numpy.zeros(3))
def save(self, dir_name):
dir_path = os.path.join(self._root_dir_path, dir_name)
if not os.path.exists(dir_path):
os.mkdir(dir_path)
others = []
for key, value in self.items():
if key.startswith('_'):
continue
if isinstance(value, (np.ndarray, list)):
np.save(os.path.join(dir_path, key + ".npy"), value)
elif isinstance(value, (chainer.Chain, chainer.ChainList)):
model_path = os.path.join(dir_path, "model.npz")
chainer.serializers.save_npz(model_path, value)
elif isinstance(value, chainer.Optimizer):
optimizer_path = os.path.join(dir_path, "optimizer.npz")
chainer.serializers.save_npz(optimizer_path, value)
else:
others.append("{}: {}".format(key, value))
with open(os.path.join(dir_path, "log.txt"), "a") as f:
text = "\n".join(others) + "\n"
f.write(text)
def init(args):
def parse(line):
attr, pos_id = line.split()
attr = tuple(attr.split(','))
return (attr, int(pos_id))
model = md.Analyzer(
md.BidirectionalRecognizer(
md.Recognizer(256, 100, 100, 100),
md.Recognizer(256, 100, 100, 100)
),
md.Tagger(
md.BiClassifier(100),
chainer.ChainList()
)
)
optimizer = optimizers.AdaGrad(lr=0.01)
optimizer.setup(model)
return Storage(model, optimizer)
def __init__(
self, n_class, aspect_ratios,
initialW=None, initial_bias=None):
self.n_class = n_class
self.aspect_ratios = aspect_ratios
super(Multibox, self).__init__()
with self.init_scope():
self.loc = chainer.ChainList()
self.conf = chainer.ChainList()
if initialW is None:
initialW = initializers.LeCunUniform()
if initial_bias is None:
initial_bias = initializers.Zero()
init = {'initialW': initialW, 'initial_bias': initial_bias}
for ar in aspect_ratios:
n = (len(ar) + 1) * 2
self.loc.add_link(L.Convolution2D(n * 4, 3, pad=1, **init))
self.conf.add_link(L.Convolution2D(
n * self.n_class, 3, pad=1, **init))
def check_chainer_model_equal(self, act, exp):
self.assertEqual(act.__class__, exp.__class__)
self.assertEqual(len(act._params), len(exp._params))
# Check for parameters
for act_param, exp_param in zip(act._params, exp._params):
act_param = getattr(act, act_param)
exp_param = getattr(exp, exp_param)
numpy.testing.assert_array_equal(act_param.data, exp_param.data)
# Recursively checking for children
if isinstance(act, chainer.ChainList):
self.assertEqual(len(act), len(exp))
for act_link, exp_link in zip(act, exp):
self.check_chainer_model_equal(act_link, exp_link)
else:
if not hasattr(act, "_children"):
return
for act_child, exp_child in zip(act._children, exp._children):
act_child = getattr(act, act_child)
exp_child = getattr(exp, exp_child)
self.check_chainer_model_equal(act_child, exp_child)
def __init__(self, *args):
super(Sequential, self).__init__()
assert len(args) > 0
assert not hasattr(self, "layers")
if len(args) == 1 and isinstance(args[0], OrderedDict):
self.layers = args[0].values()
with self.init_scope():
for key, layer in args[0].items():
if isinstance(layer, (chainer.Link, chainer.Chain, chainer.ChainList)):
setattr(self, key, layer)
else:
self.layers = args
with self.init_scope():
for idx, layer in enumerate(args):
if isinstance(layer, (chainer.Link, chainer.Chain, chainer.ChainList)):
setattr(self, str(idx), layer)
def __init__(self, in_size, out_size, hidden_sizes, normalize_input=True,
normalize_output=False, nonlinearity=F.relu, last_wscale=1):
self.in_size = in_size
self.out_size = out_size
self.hidden_sizes = hidden_sizes
self.normalize_input = normalize_input
self.normalize_output = normalize_output
self.nonlinearity = nonlinearity
super().__init__()
with self.init_scope():
if normalize_input:
self.input_bn = L.BatchNormalization(in_size)
self.input_bn.avg_var[:] = 1
if hidden_sizes:
hidden_layers = []
hidden_layers.append(LinearBN(in_size, hidden_sizes[0]))
for hin, hout in zip(hidden_sizes, hidden_sizes[1:]):
hidden_layers.append(LinearBN(hin, hout))
self.hidden_layers = chainer.ChainList(*hidden_layers)
self.output = L.Linear(hidden_sizes[-1], out_size,
initialW=LeCunNormal(last_wscale))
else:
self.output = L.Linear(in_size, out_size,
initialW=LeCunNormal(last_wscale))
if normalize_output:
self.output_bn = L.BatchNormalization(out_size)
self.output_bn.avg_var[:] = 1
def get_state(chain):
assert isinstance(chain, (chainer.Chain, chainer.ChainList))
state = []
for l in chain.children():
if isinstance(l, chainer.links.LSTM):
state.append((l.c, l.h))
elif isinstance(l, Recurrent):
state.append(l.get_state())
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
state.append(get_state(l))
else:
state.append(None)
return state
def stateful_links(chain):
for l in chain.children():
if isinstance(l, (chainer.links.LSTM, Recurrent)):
yield l
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
for m in stateful_links(l):
yield m
def set_state(chain, state):
assert isinstance(chain, (chainer.Chain, chainer.ChainList))
for l, s in zip(chain.children(), state):
if isinstance(l, chainer.links.LSTM):
c, h = s
# LSTM.set_state doesn't accept None state
if c is not None:
l.set_state(c, h)
elif isinstance(l, Recurrent):
l.set_state(s)
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
set_state(l, s)
else:
assert s is None
def __init__(self, n_input_channels, n_dim_action, n_hidden_channels,
n_hidden_layers, action_space, scale_mu=True):
self.n_input_channels = n_input_channels
self.n_hidden_layers = n_hidden_layers
self.n_hidden_channels = n_hidden_channels
assert action_space is not None
self.scale_mu = scale_mu
self.action_space = action_space
super().__init__()
with self.init_scope():
hidden_layers = []
assert n_hidden_layers >= 1
hidden_layers.append(L.Linear(n_input_channels, n_hidden_channels))
for i in range(n_hidden_layers - 1):
hidden_layers.append(
L.Linear(n_hidden_channels, n_hidden_channels))
self.hidden_layers = chainer.ChainList(*hidden_layers)
self.v = L.Linear(n_hidden_channels, 1)
self.mu = L.Linear(n_hidden_channels, n_dim_action)
self.mat_diag = L.Linear(n_hidden_channels, n_dim_action)
non_diag_size = n_dim_action * (n_dim_action - 1) // 2
if non_diag_size > 0:
self.mat_non_diag = L.Linear(n_hidden_channels, non_diag_size)
def test_shared_link_copy(self):
head = L.Linear(2, 2)
model_a = chainer.ChainList(head.copy(), L.Linear(2, 3))
model_b = chainer.ChainList(head.copy(), L.Linear(2, 4))
a_params = dict(model_a.namedparams())
b_params = dict(model_b.namedparams())
self.assertEqual(a_params['/0/W'].data.ctypes.data,
b_params['/0/W'].data.ctypes.data)
self.assertEqual(a_params['/0/b'].data.ctypes.data,
b_params['/0/b'].data.ctypes.data)
import copy
model_a_copy = copy.deepcopy(model_a)
model_b_copy = copy.deepcopy(model_b)
a_copy_params = dict(model_a_copy.namedparams())
b_copy_params = dict(model_b_copy.namedparams())
# When A and B are separately deepcopied, head is no longer shared
self.assertNotEqual(a_copy_params['/0/W'].data.ctypes.data,
b_copy_params['/0/W'].data.ctypes.data)
self.assertNotEqual(a_copy_params['/0/b'].data.ctypes.data,
b_copy_params['/0/b'].data.ctypes.data)
model_total_copy = copy.deepcopy(chainer.ChainList(model_a, model_b))
model_a_copy = model_total_copy[0]
model_b_copy = model_total_copy[1]
a_copy_params = dict(model_a_copy.namedparams())
b_copy_params = dict(model_b_copy.namedparams())
# When ChainList(A, B) is deepcopied, head is still shared!
self.assertEqual(a_copy_params['/0/W'].data.ctypes.data,
b_copy_params['/0/W'].data.ctypes.data)
self.assertEqual(a_copy_params['/0/b'].data.ctypes.data,
b_copy_params['/0/b'].data.ctypes.data)
def __init__(self, d, f, R):
self.d = d
self.f = f
self.R = R
g = ChainList(*[L.Linear(1, f) for i in six.moves.range(AtomIdMax)])
H = ChainList(*[ChainList(*[L.Linear(f, f)
for i in six.moves.range(R)])
for j in six.moves.range(5)])
W = ChainList(*[L.Linear(f, d) for i in six.moves.range(R)])
self.model = Chain(H=H, W=W, g=g)
self.optimizer = optimizers.Adam()
self.optimizer.setup(self.model)
def setUp(self):
self.l1 = chainer.Link(x=(2, 3))
self.l2 = chainer.Link(x=2)
self.l3 = chainer.Link(x=3)
self.c1 = chainer.ChainList(self.l1)
self.c1.add_link(self.l2)
self.c2 = chainer.ChainList(self.c1, self.l3)
def test_copyparams(self):
l1 = chainer.Link(x=(2, 3))
l2 = chainer.Link(x=2)
l3 = chainer.Link(x=3)
c1 = chainer.ChainList(l1, l2)
c2 = chainer.ChainList(c1, l3)
l1.x.data.fill(0)
l2.x.data.fill(1)
l3.x.data.fill(2)
self.c2.copyparams(c2)
numpy.testing.assert_array_equal(self.l1.x.data, l1.x.data)
numpy.testing.assert_array_equal(self.l2.x.data, l2.x.data)
numpy.testing.assert_array_equal(self.l3.x.data, l3.x.data)
def init(args):
def parse(line):
attr, pos_id = line.split()
attr = tuple(attr.split(','))
return (attr, int(pos_id))
mappings = Attribute(
util.OneToOneMapping(
parse(line) for line in args.pos_def
),
util.OneToOneMapping(
(row[1], int(row[0])) for row in csv.reader(args.conj_type_def)
),
util.OneToOneMapping(
(row[1], int(row[0])) for row in csv.reader(args.conj_form_def)
)
)
model = md.Analyzer(
md.BidirectionalRecognizer(
md.Recognizer(256, 256, 256, 256),
md.Recognizer(256, 256, 256, 64 + 256 + 128 + 128)
),
md.Tagger(
md.BiClassifier(64),
chainer.ChainList(
md.Classifier(256, len(mappings.pos)),
md.Classifier(128, len(mappings.conj_type)),
md.Classifier(128, len(mappings.conj_form))
)
)
)
optimizer = optimizers.AdaGrad(lr=0.01)
optimizer.setup(model)
return Storage(mappings, model, optimizer)
def test_shared_link(self):
"""Check interprocess parameter sharing works if models share links"""
head = L.Linear(2, 2)
model_a = chainer.ChainList(head.copy(), L.Linear(2, 3))
model_b = chainer.ChainList(head.copy(), L.Linear(2, 4))
a_arrays = async.extract_params_as_shared_arrays(
chainer.ChainList(model_a))
b_arrays = async.extract_params_as_shared_arrays(
chainer.ChainList(model_b))
print(('model_a shared_arrays', a_arrays))
print(('model_b shared_arrays', b_arrays))
head = L.Linear(2, 2)
model_a = chainer.ChainList(head.copy(), L.Linear(2, 3))
model_b = chainer.ChainList(head.copy(), L.Linear(2, 4))
async.set_shared_params(model_a, a_arrays)
async.set_shared_params(model_b, b_arrays)
print('model_a replaced')
a_params = dict(model_a.namedparams())
for param_name, param in list(a_params.items()):
print((param_name, param.data.ctypes.data))
print('model_b replaced')
b_params = dict(model_b.namedparams())
for param_name, param in list(b_params.items()):
print((param_name, param.data.ctypes.data))
# Pointers to head parameters must be the same
self.assertEqual(a_params['/0/W'].data.ctypes.data,
b_params['/0/W'].data.ctypes.data)
self.assertEqual(a_params['/0/b'].data.ctypes.data,
b_params['/0/b'].data.ctypes.data)
# Pointers to tail parameters must be different
self.assertNotEqual(a_params['/1/W'].data.ctypes.data,
b_params['/1/W'].data.ctypes.data)
self.assertNotEqual(a_params['/1/b'].data.ctypes.data,
b_params['/1/b'].data.ctypes.data)
def __init__(self, model_path, word_dim=None, afix_dim=None, nlayers=2,
hidden_dim=128, elu_dim=64, dep_dim=100, dropout_ratio=0.5, use_cudnn=False):
self.model_path = model_path
defs_file = model_path + "/tagger_defs.txt"
if word_dim is None:
self.train = False
Param.load(self, defs_file)
self.extractor = FeatureExtractor(model_path)
else:
self.train = True
p = Param(self)
p.dep_dim = dep_dim
p.word_dim = word_dim
p.afix_dim = afix_dim
p.hidden_dim = hidden_dim
p.elu_dim = elu_dim
p.nlayers = nlayers
p.n_words = len(read_model_defs(model_path + "/words.txt"))
p.n_suffixes = len(read_model_defs(model_path + "/suffixes.txt"))
p.n_prefixes = len(read_model_defs(model_path + "/prefixes.txt"))
p.targets = read_model_defs(model_path + "/target.txt")
p.dump(defs_file)
self.in_dim = self.word_dim + 8 * self.afix_dim
self.dropout_ratio = dropout_ratio
super(QRNNParser, self).__init__(
emb_word=L.EmbedID(self.n_words, self.word_dim, ignore_label=IGNORE),
emb_suf=L.EmbedID(self.n_suffixes, self.afix_dim, ignore_label=IGNORE),
emb_prf=L.EmbedID(self.n_prefixes, self.afix_dim, ignore_label=IGNORE),
qrnn_fs=ChainList(),
qrnn_bs=ChainList(),
arc_dep=L.Linear(2 * self.hidden_dim, self.dep_dim),
arc_head=L.Linear(2 * self.hidden_dim, self.dep_dim),
rel_dep=L.Linear(2 * self.hidden_dim, self.dep_dim),
rel_head=L.Linear(2 * self.hidden_dim, self.dep_dim),
biaffine_arc=Biaffine(self.dep_dim),
biaffine_tag=Bilinear(self.dep_dim, self.dep_dim, len(self.targets))
)
in_dim = self.in_dim
for _ in range(self.nlayers):
self.qrnn_fs.add_link(QRNNLayer(in_dim, self.hidden_dim))
self.qrnn_bs.add_link(QRNNLayer(in_dim, self.hidden_dim))
in_dim = self.hidden_dim
# in_dim += self.hidden_dim