def make_model(self, env):
n_dim_obs = env.observation_space.low.size
n_dim_action = env.action_space.low.size
n_hidden_channels = 50
policy = policies.FCGaussianPolicy(
n_input_channels=n_dim_obs,
n_hidden_layers=2,
n_hidden_channels=n_hidden_channels,
action_size=n_dim_action,
min_action=env.action_space.low,
max_action=env.action_space.high)
q_func = q_function.FCSAQFunction(
n_dim_obs=n_dim_obs,
n_dim_action=n_dim_action,
n_hidden_layers=2,
n_hidden_channels=n_hidden_channels)
return chainer.Chain(policy=policy, q_function=q_func)
python类Chain()的实例源码
def __init__(self, d, f, R, gpu):
self.d = d
self.f = f
self.R = R
self.gpu = gpu
g = ChainList(*[L.Linear(1, f) for i in six.moves.range(AtomIdMax)])
H = ChainList(*[L.Linear(f, f) for i in six.moves.range(R)])
W = ChainList(*[L.Linear(f, d) for i in six.moves.range(R + 1)])
self.optimizer = optimizers.Adam()
self.model = Chain(H=H, W=W, g=g)
if gpu:
self.model.to_gpu(0)
self.optimizer.setup(self.model)
self.to = [[] for i in six.moves.range(2)]
self.atom_sid = [[] for i in six.moves.range(2)]
self.anum = [[] for i in six.moves.range(2)]
def test_addgrads(self):
l1 = chainer.Link(x=(2, 3))
l2 = chainer.Link(x=2)
l3 = chainer.Link(x=3)
c1 = chainer.Chain(l1=l1, l2=l2)
c2 = chainer.Chain(c1=c1, l3=l3)
l1.x.grad.fill(1)
l2.x.grad.fill(2)
l3.x.grad.fill(3)
self.l1.x.grad.fill(-1)
self.l2.x.grad.fill(-2)
self.l3.x.grad.fill(-3)
self.c2.addgrads(c2)
numpy.testing.assert_array_equal(self.l1.x.grad, numpy.zeros((2, 3)))
numpy.testing.assert_array_equal(self.l2.x.grad, numpy.zeros(2))
numpy.testing.assert_array_equal(self.l3.x.grad, numpy.zeros(3))
def save(self, dir_name):
dir_path = os.path.join(self._root_dir_path, dir_name)
if not os.path.exists(dir_path):
os.mkdir(dir_path)
others = []
for key, value in self.items():
if key.startswith('_'):
continue
if isinstance(value, (np.ndarray, list)):
np.save(os.path.join(dir_path, key + ".npy"), value)
elif isinstance(value, (chainer.Chain, chainer.ChainList)):
model_path = os.path.join(dir_path, "model.npz")
chainer.serializers.save_npz(model_path, value)
elif isinstance(value, chainer.Optimizer):
optimizer_path = os.path.join(dir_path, "optimizer.npz")
chainer.serializers.save_npz(optimizer_path, value)
else:
others.append("{}: {}".format(key, value))
with open(os.path.join(dir_path, "log.txt"), "a") as f:
text = "\n".join(others) + "\n"
f.write(text)
def __init__(self, compute_accuracy=True, lossfun=softmax_cross_entropy.softmax_cross_entropy, branchweight=1, branchweights=None, ent_T=0.1, ent_Ts=None,
accfun=accuracy.accuracy):
super(Chain,self).__init__()
#branchweights = [1]*7+[1000]
self.lossfun = lossfun
if branchweight is not None and branchweights is None:
self.branchweights = [branchweight]
else:
self.branchweights = branchweights
if ent_T is not None and ent_Ts is None:
self.ent_Ts = [ent_T]
else:
self.ent_Ts = ent_Ts
self.accfun = accfun
self.y = None
self.loss = None
self.accuracy = None
self.compute_accuracy = compute_accuracy
def __call__(self, y, a, ht, y_lex):
y_dict = F.squeeze(F.batch_matmul(y_lex, a, transa=True), axis=2)
return (y + F.log(y_dict + self.alpha))
#class LinearInterpolationLexicon(chainer.Chain):
# def __init__(self, hidden_size):
# super(LinearInterpolationLexicon, self).__init__(
# perceptron = chainer.links.Linear(hidden_size, 1)
# )
#
# def __call__(self, y, a, ht, y_lex):
# y = F.softmax(y)
# y_dict = F.squeeze(F.batch_matmul(y_lex, a, transa=True), axis=2)
# gamma = F.broadcast_to(F.sigmoid(self.perceptron(ht)), y_dict.data.shape)
# return (gamma * y_dict + (1-gamma) * y)
#
def __init__(self):
self.dtype = np.float16
W = initializers.HeNormal(1 / np.sqrt(2), self.dtype)
bias = initializers.Zero(self.dtype)
chainer.Chain.__init__(
self,
conv1=L.Convolution2D(None, 96, 11,
stride=4, initialW=W, bias=bias),
conv2=L.Convolution2D(None, 256, 5, pad=2, initialW=W, bias=bias),
conv3=L.Convolution2D(None, 384, 3, pad=1, initialW=W, bias=bias),
conv4=L.Convolution2D(None, 384, 3, pad=1, initialW=W, bias=bias),
conv5=L.Convolution2D(None, 256, 3, pad=1, initialW=W, bias=bias),
fc6=L.Linear(None, 4096, initialW=W, bias=bias),
fc7=L.Linear(None, 4096, initialW=W, bias=bias),
fc8=L.Linear(None, 1000, initialW=W, bias=bias),
)
self.train = True
def main():
class PoleModel(Chain):
def __init__(self, input_num, action_num):
print(input_num, action_num)
super(PoleModel, self).__init__(
l1=L.Linear(input_num, 32),
l2=L.Linear(32, 32),
l3=L.Linear(32, action_num)
)
def q_function(self, state):
h1 = F.leaky_relu(self.l1(state))
h2 = F.leaky_relu(self.l2(h1))
return self.l3(h2)
dqn = DeepQNet(state_shape=(3, 32, 32), action_num=2, image_num_per_state=12,
model=PoleModel(3*12*32*32, action_num=2))
def __init__(self, *args):
super(Sequential, self).__init__()
assert len(args) > 0
assert not hasattr(self, "layers")
if len(args) == 1 and isinstance(args[0], OrderedDict):
self.layers = args[0].values()
with self.init_scope():
for key, layer in args[0].items():
if isinstance(layer, (chainer.Link, chainer.Chain, chainer.ChainList)):
setattr(self, key, layer)
else:
self.layers = args
with self.init_scope():
for idx, layer in enumerate(args):
if isinstance(layer, (chainer.Link, chainer.Chain, chainer.ChainList)):
setattr(self, str(idx), layer)
def __init__(self):
chainer.Chain.__init__(self)
self.dtype = np.float16
W = initializers.HeNormal(1 / np.sqrt(2), self.dtype)
bias = initializers.Zero(self.dtype)
with self.init_scope():
self.conv1 = L.Convolution2D(None, 96, 11, stride=4,
initialW=W, initial_bias=bias)
self.conv2 = L.Convolution2D(None, 256, 5, pad=2,
initialW=W, initial_bias=bias)
self.conv3 = L.Convolution2D(None, 384, 3, pad=1,
initialW=W, initial_bias=bias)
self.conv4 = L.Convolution2D(None, 384, 3, pad=1,
initialW=W, initial_bias=bias)
self.conv5 = L.Convolution2D(None, 256, 3, pad=1,
initialW=W, initial_bias=bias)
self.fc6 = L.Linear(None, 4096, initialW=W, initial_bias=bias)
self.fc7 = L.Linear(None, 4096, initialW=W, initial_bias=bias)
self.fc8 = L.Linear(None, 1000, initialW=W, initial_bias=bias)
def layer_params(layer, param_name, attr_name):
"""Return parameters in a flattened array from the given layer or an empty
array if the parameters are not found.
Args:
layer (~chainer.Link): The layer from which parameters are collected.
param_name (str): Name of the parameter, ``'W'`` or ``'b'``.
attr_name (str): Name of the attribute, ``'data'`` or ``'grad'``.
Returns:
array: Flattened array of parameters.
"""
if isinstance(layer, chainer.Chain):
# Nested chainer.Chain, aggregate all underlying statistics
return layers_params(layer, param_name, attr_name)
elif not hasattr(layer, param_name):
return layer.xp.array([])
params = getattr(layer, param_name)
params = getattr(params, attr_name)
return params.flatten()
def layers_params(model, param_name, attr_name):
"""Return all parameters in a flattened array from the given model.
Args:
model (~chainer.Chain): The model from which parameters are collected.
param_name (str): Name of the parameter, ``'W'`` or ``'b'``.
attr_name (str): Name of the attribute, ``'data'`` or ``'grad'``.
Returns:
array: Flattened array of parameters.
"""
xp = model.xp
params = xp.array([], dtype=xp.float32)
for param in model.params():
if param.name == param_name:
values = getattr(param, attr_name)
values = values.flatten()
params = xp.concatenate((params, values)) # Slow?
return params
def __init__(self, n_units, n_out):
super(MLP, self).__init__(
# the size of the inputs to each layer will be inferred
l1=L.Linear(None, n_units), # n_in -> n_units
l2=L.Linear(None, n_units), # n_units -> n_units
l3=L.Linear(None, n_out), # n_units -> n_out
)
# To use the static graph feature, just add the `@static_graph' decorator to the
# `__call__()` method of a Chain.
def get_state(chain):
assert isinstance(chain, (chainer.Chain, chainer.ChainList))
state = []
for l in chain.children():
if isinstance(l, chainer.links.LSTM):
state.append((l.c, l.h))
elif isinstance(l, Recurrent):
state.append(l.get_state())
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
state.append(get_state(l))
else:
state.append(None)
return state
def stateful_links(chain):
for l in chain.children():
if isinstance(l, (chainer.links.LSTM, Recurrent)):
yield l
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
for m in stateful_links(l):
yield m
def set_state(chain, state):
assert isinstance(chain, (chainer.Chain, chainer.ChainList))
for l, s in zip(chain.children(), state):
if isinstance(l, chainer.links.LSTM):
c, h = s
# LSTM.set_state doesn't accept None state
if c is not None:
l.set_state(c, h)
elif isinstance(l, Recurrent):
l.set_state(s)
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
set_state(l, s)
else:
assert s is None
def reset_state(chain):
assert isinstance(chain, (chainer.Chain, chainer.ChainList))
for l in chain.children():
if isinstance(l, chainer.links.LSTM):
l.reset_state()
elif isinstance(l, Recurrent):
l.reset_state()
elif isinstance(l, (chainer.Chain, chainer.ChainList)):
reset_state(l)
def __init__(self, n_input_channels, n_hidden_layers,
n_hidden_channels, action_size,
min_action=None, max_action=None, bound_action=True,
nonlinearity=F.relu,
last_wscale=1.):
self.n_input_channels = n_input_channels
self.n_hidden_layers = n_hidden_layers
self.n_hidden_channels = n_hidden_channels
self.action_size = action_size
self.min_action = min_action
self.max_action = max_action
self.bound_action = bound_action
if self.bound_action:
def action_filter(x):
return bound_by_tanh(
x, self.min_action, self.max_action)
else:
action_filter = None
model = chainer.Chain(
fc=MLP(self.n_input_channels,
n_hidden_channels,
(self.n_hidden_channels,) * self.n_hidden_layers,
nonlinearity=nonlinearity,
),
lstm=L.LSTM(n_hidden_channels, n_hidden_channels),
out=L.Linear(n_hidden_channels, action_size,
initialW=LeCunNormal(last_wscale)),
)
def model_call(model, x):
h = nonlinearity(model.fc(x))
h = model.lstm(h)
h = model.out(h)
return h
super().__init__(
model=model,
model_call=model_call,
action_filter=action_filter)
def remove_link(self, name):
"""Remove a link that has the given name from this model
Optimizer sees ``~Chain.namedparams()`` to know which parameters should
be updated. And inside of ``namedparams()``, ``self._children`` is
called to get names of all links included in the Chain.
"""
self._children.remove(name)
def __init__(self, **kwargs):
# initialization for chainer.Chain
# If you don't initialize, model.to_gpu doesn't work, because no link
super(Model, self).__init__(**kwargs)
self.nz_save_model_epoch = 0
self.nz_save_optimizer_epoch = 0
self.nz_xp = self._check_cupy()
self.nz_flag_computational_graph = False
def __init__(self, d, f, R):
self.d = d
self.f = f
self.R = R
g = ChainList(*[L.Linear(1, f) for i in six.moves.range(AtomIdMax)])
H = ChainList(*[ChainList(*[L.Linear(f, f)
for i in six.moves.range(R)])
for j in six.moves.range(5)])
W = ChainList(*[L.Linear(f, d) for i in six.moves.range(R)])
self.model = Chain(H=H, W=W, g=g)
self.optimizer = optimizers.Adam()
self.optimizer.setup(self.model)
def __init__(self, **links):
super(Chain, self).__init__()
self._children = []
for name, link in six.iteritems(links):
self.add_link(name, link)
def copy(self):
ret = super(Chain, self).copy()
ret._children = list(ret._children)
d = ret.__dict__
for name in ret._children:
# copy child links recursively
copied = d[name].copy()
copied.name = name
d[name] = copied
return ret
def to_cpu(self):
super(Chain, self).to_cpu()
d = self.__dict__
for name in self._children:
d[name].to_cpu()
return self
def to_gpu(self, device=None):
with cuda.get_device(device):
super(Chain, self).to_gpu()
d = self.__dict__
for name in self._children:
d[name].to_gpu()
return self
def params(self):
for param in super(Chain, self).params():
yield param
d = self.__dict__
for name in self._children:
for param in d[name].params():
yield param
def copyparams(self, link):
super(Chain, self).copyparams(link)
src = link.__dict__
dst = self.__dict__
for name in self._children:
dst[name].copyparams(src[name])
def zerograds(self):
super(Chain, self).zerograds()
d = self.__dict__
for name in self._children:
d[name].zerograds()
def addgrads(self, link):
super(Chain, self).addgrads(link)
src = link.__dict__
dst = self.__dict__
for name in self._children:
dst[name].addgrads(src[name])
def serialize(self, serializer):
super(Chain, self).serialize(serializer)
d = self.__dict__
for name in self._children:
d[name].serialize(serializer[name])