def __init__(self, inner_layer_arg, **kwargs):
# Initialise based on one of the three initialisation methods
# Case 1: Check if inner_layer_arg is conv_width
if isinstance(inner_layer_arg, (int, long)):
self.conv_width = inner_layer_arg
dense_layer_kwargs, kwargs = filter_func_args(layers.Dense.__init__,
kwargs, overrule_args=['name'])
self.create_inner_layer_fn = lambda: layers.Dense(self.conv_width, **dense_layer_kwargs)
# Case 2: Check if an initialised keras layer is given
elif isinstance(inner_layer_arg, layers.Layer):
assert inner_layer_arg.built == False, 'When initialising with a keras layer, it cannot be built.'
_, self.conv_width = inner_layer_arg.get_output_shape_for((None, None))
# layer_from_config will mutate the config dict, therefore create a get fn
self.create_inner_layer_fn = lambda: layer_from_config(dict(
class_name=inner_layer_arg.__class__.__name__,
config=inner_layer_arg.get_config()))
# Case 3: Check if a function is provided that returns a initialised keras layer
elif callable(inner_layer_arg):
example_instance = inner_layer_arg()
assert isinstance(example_instance, layers.Layer), 'When initialising with a function, the function has to return a keras layer'
assert example_instance.built == False, 'When initialising with a keras layer, it cannot be built.'
_, self.conv_width = example_instance.get_output_shape_for((None, None))
self.create_inner_layer_fn = inner_layer_arg
else:
raise ValueError('NeuralGraphHidden has to be initialised with 1). int conv_widht, 2). a keras layer instance, or 3). a function returning a keras layer instance.')
super(NeuralGraphHidden, self).__init__(**kwargs)
python类Layer()的实例源码
def __init__(self, inner_layer_arg, **kwargs):
# Initialise based on one of the three initialisation methods
# Case 1: Check if inner_layer_arg is fp_length
if isinstance(inner_layer_arg, (int, long)):
self.fp_length = inner_layer_arg
dense_layer_kwargs, kwargs = filter_func_args(layers.Dense.__init__,
kwargs, overrule_args=['name'])
self.create_inner_layer_fn = lambda: layers.Dense(self.fp_length, **dense_layer_kwargs)
# Case 2: Check if an initialised keras layer is given
elif isinstance(inner_layer_arg, layers.Layer):
assert inner_layer_arg.built == False, 'When initialising with a keras layer, it cannot be built.'
_, self.fp_length = inner_layer_arg.get_output_shape_for((None, None))
self.create_inner_layer_fn = lambda: inner_layer_arg
# Case 3: Check if a function is provided that returns a initialised keras layer
elif callable(inner_layer_arg):
example_instance = inner_layer_arg()
assert isinstance(example_instance, layers.Layer), 'When initialising with a function, the function has to return a keras layer'
assert example_instance.built == False, 'When initialising with a keras layer, it cannot be built.'
_, self.fp_length = example_instance.get_output_shape_for((None, None))
self.create_inner_layer_fn = inner_layer_arg
else:
raise ValueError('NeuralGraphHidden has to be initialised with 1). int conv_widht, 2). a keras layer instance, or 3). a function returning a keras layer instance.')
super(NeuralGraphOutput, self).__init__(**kwargs)
def find_activation_layer(layer, node_index):
"""
Args:
layer(Layer):
node_index:
"""
output_shape = layer.get_output_shape_at(node_index)
maybe_layer = layer
node = maybe_layer.inbound_nodes[node_index]
# Loop will be broken by an error if an output layer is encountered
while True:
# If maybe_layer has a nonlinear activation function return it and its index
activation = getattr(maybe_layer, 'activation', linear)
if activation.__name__ != 'linear':
if maybe_layer.get_output_shape_at(node_index) != output_shape:
ValueError('The activation layer ({0}), does not have the same'
' output shape as {1]'.format(maybe_layer.name,
layer.name))
return maybe_layer, node_index
# If not, move to the next layer in the datastream
next_nodes = get_shallower_nodes(node)
# test if node is a list of nodes with more than one item
if len(next_nodes) > 1:
ValueError('The model must not branch between the chosen layer'
' and the activation layer.')
node = next_nodes[0]
node_index = get_node_index(node)
maybe_layer = node.outbound_layer
# Check if maybe_layer has weights, no activation layer has been found
if maybe_layer.weights and (
not maybe_layer.__class__.__name__.startswith('Global')):
AttributeError('There is no nonlinear activation layer between {0}'
' and {1}'.format(layer.name, maybe_layer.name))
def copy_weights(src_model, dst_model, must_exist=True):
"""Copy weights from `src_model` to `dst_model`.
Parameters
----------
src_model
Keras source model.
dst_model
Keras destination model.
must_exist: bool
If `True`, raises `ValueError` if a layer in `dst_model` does not exist
in `src_model`.
Returns
-------
list
Names of layers that were copied.
"""
copied = []
for dst_layer in dst_model.layers:
for src_layer in src_model.layers:
if src_layer.name == dst_layer.name:
break
if not src_layer:
if must_exist:
tmp = 'Layer "%s" not found!' % (src_layer.name)
raise ValueError(tmp)
else:
continue
dst_layer.set_weights(src_layer.get_weights())
copied.append(dst_layer.name)
return copied
def WordLSTM(batch_size, d_W, d_L, V_W):
class WordMask(Layer):
def __init__(self, **kwargs):
super(WordMask, self).__init__(**kwargs)
def call(self, x, mask=None):
assert mask is None
return K.cast(K.any(x, axis=-1), K.floatx())
def get_output_shape_for(self, input_shape):
return input_shape
# inputs
x = Input(batch_shape=(batch_size, V_W.size), name='context')
# sub-models
input = Dense(d_W)
lm = LanguageModel(batch_size, d_W, d_L)
output = Dense(V_W.size)
# the actual word_lstm model
ctx = Dropout(.5)(input(x))
ctx_mask = WordMask()(x)
c = Dropout(.5)(lm([ctx, ctx_mask]))
y_logit = output(Dense(150)(c))
y = Activation('softmax')(y_logit)
word_lstm = Model(input=x, output=y)
word_lstm.get_hyperparams = lambda: (batch_size, d_W, d_L, V_W)
return word_lstm
def reset_states(self, states=None):
if not self.stateful:
raise AttributeError('Layer must be stateful.')
if not self.input_spec:
raise RuntimeError('Layer has never been called '
'and thus has no states.')
batch_size = self.input_spec.shape[0]
if not batch_size:
raise ValueError('If a QRNN is stateful, it needs to know '
'its batch size. Specify the batch size '
'of your input tensors: \n'
'- If using a Sequential model, '
'specify the batch size by passing '
'a `batch_input_shape` '
'argument to your first layer.\n'
'- If using the functional API, specify '
'the time dimension by passing a '
'`batch_shape` argument to your Input layer.')
if self.states[0] is None:
self.states = [K.zeros((batch_size, self.units))
for _ in self.states]
elif states is None:
for state in self.states:
K.set_value(state, np.zeros((batch_size, self.units)))
else:
if not isinstance(states, (list, tuple)):
states = [states]
if len(states) != len(self.states):
raise ValueError('Layer ' + self.name + ' expects ' +
str(len(self.states)) + ' states, '
'but it received ' + str(len(states)) +
'state values. Input received: ' +
str(states))
for index, (value, state) in enumerate(zip(states, self.states)):
if value.shape != (batch_size, self.units):
raise ValueError('State ' + str(index) +
' is incompatible with layer ' +
self.name + ': expected shape=' +
str((batch_size, self.units)) +
', found shape=' + str(value.shape))
K.set_value(state, value)
def call(self, inputs, mask=None, initial_state=None, training=None):
# input shape: `(samples, time (padded with zeros), input_dim)`
# note that the .build() method of subclasses MUST define
# self.input_spec and self.state_spec with complete input shapes.
if isinstance(inputs, list):
initial_states = inputs[1:]
inputs = inputs[0]
elif initial_state is not None:
pass
elif self.stateful:
initial_states = self.states
else:
initial_states = self.get_initial_states(inputs)
if len(initial_states) != len(self.states):
raise ValueError('Layer has ' + str(len(self.states)) +
' states but was passed ' +
str(len(initial_states)) +
' initial states.')
input_shape = K.int_shape(inputs)
if self.unroll and input_shape[1] is None:
raise ValueError('Cannot unroll a RNN if the '
'time dimension is undefined. \n'
'- If using a Sequential model, '
'specify the time dimension by passing '
'an `input_shape` or `batch_input_shape` '
'argument to your first layer. If your '
'first layer is an Embedding, you can '
'also use the `input_length` argument.\n'
'- If using the functional API, specify '
'the time dimension by passing a `shape` '
'or `batch_shape` argument to your Input layer.')
constants = self.get_constants(inputs, training=None)
preprocessed_input = self.preprocess_input(inputs, training=None)
last_output, outputs, states = K.rnn(self.step, preprocessed_input,
initial_states,
go_backwards=self.go_backwards,
mask=mask,
constants=constants,
unroll=self.unroll,
input_length=input_shape[1])
if self.stateful:
updates = []
for i in range(len(states)):
updates.append((self.states[i], states[i]))
self.add_update(updates, inputs)
# Properly set learning phase
if 0 < self.dropout < 1:
last_output._uses_learning_phase = True
outputs._uses_learning_phase = True
if self.return_sequences:
return outputs
else:
return last_output
def __init__(self, output_dim, hidden_dim, output_length, depth=1,bidirectional=True, dropout=0.1, **kwargs):
if bidirectional and hidden_dim % 2 != 0:
raise Exception ("hidden_dim for AttentionSeq2seq should be even (Because of bidirectional RNN).")
super(AttentionSeq2seq, self).__init__()
if type(depth) not in [list, tuple]:
depth = (depth, depth)
if 'batch_input_shape' in kwargs:
shape = kwargs['batch_input_shape']
del kwargs['batch_input_shape']
elif 'input_shape' in kwargs:
shape = (None,) + tuple(kwargs['input_shape'])
del kwargs['input_shape']
elif 'input_dim' in kwargs:
if 'input_length' in kwargs:
input_length = kwargs['input_length']
else:
input_length = None
shape = (None, input_length, kwargs['input_dim'])
del kwargs['input_dim']
self.add(Layer(batch_input_shape=shape))
if bidirectional:
self.add(Bidirectional(LSTMEncoder(output_dim=int(hidden_dim / 2), state_input=False, return_sequences=True, **kwargs)))
else:
self.add(LSTMEncoder(output_dim=hidden_dim, state_input=False, return_sequences=True, **kwargs))
for i in range(0, depth[0] - 1):
self.add(Dropout(dropout))
if bidirectional:
self.add(Bidirectional(LSTMEncoder(output_dim=int(hidden_dim / 2), state_input=False, return_sequences=True, **kwargs)))
else:
self.add(LSTMEncoder(output_dim=hidden_dim, state_input=False, return_sequences=True, **kwargs))
encoder = self.layers[-1]
self.add(Dropout(dropout))
self.add(TimeDistributed(Dense(hidden_dim if depth[1] > 1 else output_dim)))
decoder = AttentionDecoder(hidden_dim=hidden_dim, output_length=output_length, state_input=False, **kwargs)
self.add(Dropout(dropout))
self.add(decoder)
for i in range(0, depth[1] - 1):
self.add(Dropout(dropout))
self.add(LSTMEncoder(output_dim=hidden_dim, state_input=False, return_sequences=True, **kwargs))
self.add(Dropout(dropout))
self.add(TimeDistributed(Dense(output_dim, activation='softmax')))
self.encoder = encoder
self.decoder = decoder