def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
python类NetParameter()的实例源码
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def _load_src_params_plain(self, pretrained_model):
""" Load parameters from the source model
All parameters are saved in a dictionary where
the keys are the original layer names
"""
# load pretrained model
with open(pretrained_model, 'rb') as f:
binary_content = f.read()
model = caffe_pb2.NetParameter()
model.ParseFromString(binary_content)
layers = model.layer
src_params = {}
for lc in layers:
name = lc.name
src_params[name] = [np.reshape(np.array(lc.blobs[i].data), lc.blobs[i].shape.dim) for i in xrange(len(lc.blobs))]
# if len(lc.blobs) >= 2:
# src_params[name] = [np.reshape(np.array(lc.blobs[0].data), lc.blobs[0].shape.dim),
# np.reshape(np.array(lc.blobs[1].data), lc.blobs[1].shape.dim)]
return src_params
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
print "hello"
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
print(len(layer.top))
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
#print layer.type
#print type(layer.top)
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def make_testable(train_model_path):
# load the train net prototxt as a protobuf message
with open(train_model_path) as f:
train_str = f.read()
train_net = caffe_pb2.NetParameter()
text_format.Merge(train_str, train_net)
# add the mean, var top blobs to all BN layers
for layer in train_net.layer:
if layer.type == "BN" and len(layer.top) == 1:
layer.top.append(layer.top[0] + "-mean")
layer.top.append(layer.top[0] + "-var")
# remove the test data layer if present
if train_net.layer[1].name == "data" and train_net.layer[1].include:
train_net.layer.remove(train_net.layer[1])
if train_net.layer[0].include:
# remove the 'include {phase: TRAIN}' layer param
train_net.layer[0].include.remove(train_net.layer[0].include[0])
return train_net
def load(self, filename, bgr_to_rgb=True):
"""Load weights from a .caffemodel file and initialize counters.
Params:
filename: caffemodel file.
"""
print('Loading Caffe file:', filename)
caffemodel_params = caffe_pb2.NetParameter()
caffemodel_str = open(filename, 'rb').read()
caffemodel_params.ParseFromString(caffemodel_str)
self.caffe_layers = caffemodel_params.layer
# Layers collection.
self.layers['convolution'] = [i for i, l in enumerate(self.caffe_layers)
if l.type == 'Convolution']
self.layers['l2_normalization'] = [i for i, l in enumerate(self.caffe_layers)
if l.type == 'Normalize']
# BGR to RGB convertion. Tries to find the first convolution with 3
# and exchange parameters.
if bgr_to_rgb:
self.bgr_to_rgb = 1
def parse_caffemodel(caffemodel):
model = caffe_pb2.NetParameter()
print 'Loading caffemodel: ', caffemodel
with open(caffemodel, 'rb') as fp:
model.ParseFromString(fp.read())
return model
def get_netparameter(model):
with open(model) as f:
net = cp.NetParameter()
pb.text_format.Parse(f.read(), net)
return net
def _load_layer_types(prototxt):
# Read prototxt with caffe protobuf definitions
layers = caffe_pb2.NetParameter()
with open(prototxt, 'r') as f:
text_format.Merge(str(f.read()), layers)
# Assign layer parameters to type dictionary
types = OrderedDict()
for i in range(len(layers.layer)):
types[layers.layer[i].name] = layers.layer[i].type
return types
def parse_caffemodel(caffemodel):
model = caffe_pb2.NetParameter()
print 'Loading caffemodel: ', caffemodel
with open(caffemodel, 'rb') as fp:
model.ParseFromString(fp.read())
return model
def get_netparameter(self, model):
with open(model) as f:
net = cp.NetParameter()
pb.text_format.Parse(f.read(), net)
return net
def drop_absorber_weights(model, net):
# load the prototxt file as a protobuf message
with open(model) as f:
str2 = f.read()
msg = caffe_pb2.NetParameter()
text_format.Merge(str2, msg)
# iterate over all layers of the network
for i, layer in enumerate(msg.layer):
if not layer.type == 'Python':
continue
conv_layer = msg.layer[i - 2].name # conv layers are always two layers behind dropout
# get some necessary sizes
kernel_size = 1
shape_of_kernel_blob = net.params[conv_layer][0].data.shape
number_of_feature_maps = list(shape_of_kernel_blob[0:1])
shape_of_kernel_blob = list(shape_of_kernel_blob[1:4])
for x in shape_of_kernel_blob:
kernel_size *= x
weight = copy_double(net.params[conv_layer][0].data)
bias = copy_double(net.params[conv_layer][1].data)
# get p from dropout layer
python_param_str = eval(msg.layer[i].python_param.param_str)
p = float(python_param_str['p'])
scale = 1/(1-p)
# manipulate the weights and biases over all feature maps:
for j in xrange(number_of_feature_maps[0]):
net.params[conv_layer][0].data[j] = weight[j] * scale
net.params[conv_layer][1].data[j] = bias[j] * scale
return net
def bn_absorber_prototxt(model):
# load the prototxt file as a protobuf message
with open(model) as k:
str1 = k.read()
msg1 = caffe_pb2.NetParameter()
text_format.Merge(str1, msg1)
# search for bn layer and remove them
for i, l in enumerate(msg1.layer):
if l.type == "BN":
if msg1.layer[i].name == 'bn0_1':
continue
if msg1.layer[i - 1].type == 'Deconvolution':
continue
msg1.layer.remove(l)
msg1.layer[i].bottom.append(msg1.layer[i-1].top[0])
if len(msg1.layer[i].bottom) == 2:
msg1.layer[i].bottom.remove(msg1.layer[i].bottom[0])
elif len(msg1.layer[i].bottom) == 3:
if ('bn' in msg1.layer[i].bottom[0]) is True: # to remove just the layers with 'bn' in the name
msg1.layer[i].bottom.remove(msg1.layer[i].bottom[0])
elif ('bn' in msg1.layer[i].bottom[1]) is True:
msg1.layer[i].bottom.remove(msg1.layer[i].bottom[1])
else:
raise Exception("no bottom blob with name 'bn' present in {} layer".format(msg1.layer[i]))
else:
raise Exception("bn absorber does not support more than 2 input blobs for layer {}"
.format(msg1.layer[i]))
if msg1.layer[i].type == 'Upsample':
temp = msg1.layer[i].bottom[0]
msg1.layer[i].bottom[0] = msg1.layer[i].bottom[1]
msg1.layer[i].bottom[1] = temp
# l.bottom.append(l.top[0]) #msg1.layer[i-1].top
return msg1
def add_bias_to_conv(model, weights, out_dir):
# load the prototxt file as a protobuf message
with open(model) as n:
str1 = n.read()
msg2 = caffe_pb2.NetParameter()
text_format.Merge(str1, msg2)
for l2 in msg2.layer:
if l2.type == "Convolution":
if l2.convolution_param.bias_term is False:
l2.convolution_param.bias_term = True
l2.convolution_param.bias_filler.type = 'constant'
l2.convolution_param.bias_filler.value = 0.0 # actually default value
model_temp = os.path.join(out_dir, "model_temp.prototxt")
print "Saving temp model..."
with open(model_temp, 'w') as m:
m.write(text_format.MessageToString(msg2))
net_src = caffe.Net(model, weights, caffe.TEST)
net_des = caffe.Net(model_temp, caffe.TEST)
for l3 in net_src.params.keys():
for i in range(len(net_src.params[l3])):
net_des.params[l3][i].data[:] = net_src.params[l3][i].data[:]
# save weights with bias
weights_temp = os.path.join(out_dir, "weights_temp.caffemodel")
print "Saving temp weights..."
net_des.save(weights_temp)
return model_temp, weights_temp
def draw_network(model, image_path):
""" Draw a network and save the graph in the specified image path
Args:
model (str): path to the prototxt file (model definition)
image_path (str): path where to save the image
"""
net = caffe_pb2.NetParameter()
text_format.Merge(open(model).read(), net)
caffe.draw.draw_net_to_file(net, image_path, 'BT')
# In[ ]:
def create_model(depth):
model = caffe_pb2.NetParameter()
model.name = 'ResNet_{}'.format(depth)
configs = {
50: [3, 4, 6, 3],
101: [3, 4, 23, 3],
152: [3, 8, 36, 3],
200: [3, 24, 36, 3],
}
num = configs[depth]
layers = []
layers.append(Data('data', ['data', 'label'],
'examples/imagenet/ilsvrc12_train_lmdb', 32, 'train'))
layers.append(Data('data', ['data', 'label'],
'examples/imagenet/ilsvrc12_val_lmdb', 25, 'test'))
layers.append(Conv('conv1', 'data', 64, 7, 2, 3))
layers.extend(Act('conv1', layers[-1].top[0]))
layers.append(Pool('pool1', layers[-1].top[0], 'max', 3, 2, 0))
layers.extend(ResLayer('conv2', layers[-1].top[0], num[0], 64, 1, 'first'))
layers.extend(ResLayer('conv3', layers[-1].top[0], num[1], 128, 2))
layers.extend(ResLayer('conv4', layers[-1].top[0], num[2], 256, 2))
layers.extend(ResLayer('conv5', layers[-1].top[0], num[3], 512, 2))
layers.extend(Act('conv5', layers[-1].top[0]))
layers.append(Pool('pool5', layers[-1].top[0], 'ave', 7, 1, 0))
layers.append(Linear('fc', layers[-1].top[0], 1000))
layers.append(Loss('loss', ['fc', 'label']))
layers.append(Accuracy('accuracy_top1', ['fc', 'label'], 1))
layers.append(Accuracy('accuracy_top5', ['fc', 'label'], 5))
model.layer.extend(layers)
return model
def __init__(self, name="network"):
self.net = caffe_pb2.NetParameter()
self.net.name = name
self.bottom = None
self.cur = None
self.this = None
def __init__(self, name="network", pt=None):
self.net = caffe_pb2.NetParameter()
if pt is None:
self.net.name = name
else:
with open(pt, 'rt') as f:
pb2.text_format.Merge(f.read(), self.net)
self.bottom = None
self.cur = None
self.this = None
self._layer = None
self._bottom = None
def drop_absorber_weights(model, net):
# load the prototxt file as a protobuf message
with open(model) as f:
str2 = f.read()
msg = caffe_pb2.NetParameter()
text_format.Merge(str2, msg)
# iterate over all layers of the network
for i, layer in enumerate(msg.layer):
if not layer.type == 'Python':
continue
conv_layer = msg.layer[i - 2].name # conv layers are always two layers behind dropout
# get some necessary sizes
kernel_size = 1
shape_of_kernel_blob = net.params[conv_layer][0].data.shape
number_of_feature_maps = list(shape_of_kernel_blob[0:1])
shape_of_kernel_blob = list(shape_of_kernel_blob[1:4])
for x in shape_of_kernel_blob:
kernel_size *= x
weight = copy_double(net.params[conv_layer][0].data)
bias = copy_double(net.params[conv_layer][1].data)
# get p from dropout layer
python_param_str = eval(msg.layer[i].python_param.param_str)
p = float(python_param_str['p'])
scale = 1/(1-p)
# manipulate the weights and biases over all feature maps:
for j in xrange(number_of_feature_maps[0]):
net.params[conv_layer][0].data[j] = weight[j] * scale
net.params[conv_layer][1].data[j] = bias[j] * scale
return net
def bn_absorber_prototxt(model):
# load the prototxt file as a protobuf message
with open(model) as k:
str1 = k.read()
msg1 = caffe_pb2.NetParameter()
text_format.Merge(str1, msg1)
# search for bn layer and remove them
for i, l in enumerate(msg1.layer):
if l.type == "BN":
if msg1.layer[i].name == 'bn0_1':
continue
if msg1.layer[i - 1].type == 'Deconvolution':
continue
msg1.layer.remove(l)
msg1.layer[i].bottom.append(msg1.layer[i-1].top[0])
if len(msg1.layer[i].bottom) == 2:
msg1.layer[i].bottom.remove(msg1.layer[i].bottom[0])
elif len(msg1.layer[i].bottom) == 3:
if ('bn' in msg1.layer[i].bottom[0]) is True: # to remove just the layers with 'bn' in the name
msg1.layer[i].bottom.remove(msg1.layer[i].bottom[0])
elif ('bn' in msg1.layer[i].bottom[1]) is True:
msg1.layer[i].bottom.remove(msg1.layer[i].bottom[1])
else:
raise Exception("no bottom blob with name 'bn' present in {} layer".format(msg1.layer[i]))
else:
raise Exception("bn absorber does not support more than 2 input blobs for layer {}"
.format(msg1.layer[i]))
if msg1.layer[i].type == 'Upsample':
temp = msg1.layer[i].bottom[0]
msg1.layer[i].bottom[0] = msg1.layer[i].bottom[1]
msg1.layer[i].bottom[1] = temp
# l.bottom.append(l.top[0]) #msg1.layer[i-1].top
return msg1
def add_bias_to_conv(model, weights, out_dir):
# load the prototxt file as a protobuf message
with open(model) as n:
str1 = n.read()
msg2 = caffe_pb2.NetParameter()
text_format.Merge(str1, msg2)
for l2 in msg2.layer:
if l2.type == "Convolution":
if l2.convolution_param.bias_term is False:
l2.convolution_param.bias_term = True
l2.convolution_param.bias_filler.type = 'constant'
l2.convolution_param.bias_filler.value = 0.0 # actually default value
model_temp = os.path.join(out_dir, "model_temp.prototxt")
print "Saving temp model..."
with open(model_temp, 'w') as m:
m.write(text_format.MessageToString(msg2))
net_src = caffe.Net(model, weights, caffe.TEST)
net_des = caffe.Net(model_temp, caffe.TEST)
for l3 in net_src.params.keys():
for i in range(len(net_src.params[l3])):
net_des.params[l3][i].data[:] = net_src.params[l3][i].data[:]
# save weights with bias
weights_temp = os.path.join(out_dir, "weights_temp.caffemodel")
print "Saving temp weights..."
net_des.save(weights_temp)
return model_temp, weights_temp
def draw_net(net_proto_file, out_img_file, style = 'TB'):
"""
draw cnn network into image.
IN: net_proto_file net definition file
IN: style 'TB' for top-> bottom, 'LR' for lelf->right
OUT: out_img_file output image
"""
net = caffe_pb2.NetParameter()
text_format.Merge(open(net_proto_file).read(), net)
if not net.name:
net.name = 'cnn_net'
print('\nDrawing net to %s' % out_img_file)
caffe.draw.draw_net_to_file(net, out_img_file, style)
def draw_net(net_proto_file, out_img_file, style = 'TB'):
"""
draw cnn network into image.
IN: net_proto_file net definition file
IN: style 'TB' for top-> bottom, 'LR' for lelf->right
OUT: out_img_file output image
"""
net = caffe_pb2.NetParameter()
text_format.Merge(open(net_proto_file).read(), net)
if not net.name:
net.name = 'cnn_net'
print('\nDrawing net to %s' % out_img_file)
caffe.draw.draw_net_to_file(net, out_img_file, style)
def draw_net(net_proto_file, out_img_file, style = 'TB'):
"""
draw cnn network into image.
IN: net_proto_file net definition file
IN: style 'TB' for top-> bottom, 'LR' for lelf->right
OUT: out_img_file output image
"""
net = caffe_pb2.NetParameter()
text_format.Merge(open(net_proto_file).read(), net)
if not net.name:
net.name = 'cnn_net'
print('\nDrawing net to %s' % out_img_file)
caffe.draw.draw_net_to_file(net, out_img_file, style)
def bn_absorber_prototxt(model):
# load the prototxt file as a protobuf message
with open(model) as k:
str1 = k.read()
msg1 = caffe_pb2.NetParameter()
text_format.Merge(str1, msg1)
# search for bn layer and remove them
for l in msg1.layer:
if l.type == "BN":
msg1.layer.remove(l)
return msg1
def __init__(self):
print('Loading Caffe file:', caffemodel_path)
caffemodel_params = caffe_pb2.NetParameter()
caffemodel_str = open(caffemodel_path, 'rb').read()
caffemodel_params.ParseFromString(caffemodel_str)
caffe_layers = caffemodel_params.layer
self.layers = []
self.counter = 0
self.bgr_to_rgb = False
for layer in caffe_layers:
if layer.type == 'Convolution':
self.layers.append(layer)