def load_genome_json():
"""
Parses the genome json file. Returns the question dictionary and the
answer dictionary.
"""
qdic, adic = {}, {}
with open(config.DATA_PATHS['genome']['genome_file'], 'r') as f:
qdata = json.load(f)
for q in qdata:
key = 'genome' + QID_KEY_SEPARATOR + str(q['id'])
qdic[key] = {'qstr': q['question'], 'iid': q['image']}
adic[key] = [{'answer': q['answer']}]
print 'parsed', len(qdic), 'questions for genome'
return qdic, adic
python类load()的实例源码
def __init__(self, batchsize=64, max_length=15, mode='train'):
self.batchsize = batchsize
self.d_vocabulary = None
self.batch_index = None
self.batch_len = None
self.rev_adict = None
self.max_length = max_length
self.mode = mode
self.qdic, self.adic = VQADataProvider.load_data(mode)
with open('./result/vdict.json','r') as f:
self.vdict = json.load(f)
with open('./result/adict.json','r') as f:
self.adict = json.load(f)
self.n_ans_vocabulary = len(self.adict)
self.nlp = spacy.load('en', vectors='en_glove_cc_300_1m_vectors')
self.glove_dict = {} # word -> glove vector
def load_vqa_json(data_split):
"""
Parses the question and answer json files for the given data split.
Returns the question dictionary and the answer dictionary.
"""
qdic, adic = {}, {}
with open(config.DATA_PATHS[data_split]['ques_file'], 'r') as f:
qdata = json.load(f)['questions']
for q in qdata:
qdic[data_split + QID_KEY_SEPARATOR + str(q['question_id'])] = \
{'qstr': q['question'], 'iid': q['image_id']}
if 'test' not in data_split:
with open(config.DATA_PATHS[data_split]['ans_file'], 'r') as f:
adata = json.load(f)['annotations']
for a in adata:
adic[data_split + QID_KEY_SEPARATOR + str(a['question_id'])] = \
a['answers']
print 'parsed', len(qdic), 'questions for', data_split
return qdic, adic
def load_genome_json():
"""
Parses the genome json file. Returns the question dictionary and the
answer dictionary.
"""
qdic, adic = {}, {}
with open(config.DATA_PATHS['genome']['genome_file'], 'r') as f:
qdata = json.load(f)
for q in qdata:
key = 'genome' + QID_KEY_SEPARATOR + str(q['id'])
qdic[key] = {'qstr': q['question'], 'iid': q['image']}
adic[key] = [{'answer': q['answer']}]
print 'parsed', len(qdic), 'questions for genome'
return qdic, adic
def test_individual_stability_matrix():
"""
Tests individual_stability_matrix method on three gaussian blobs.
"""
import utils
import numpy as np
import scipy as sp
desired = np.load(home + '/git_repo/PyBASC/tests/ism_test.npy')
blobs = generate_blobs()
ism = utils.individual_stability_matrix(blobs, 20, 3)
#how to use test here?
# np.corrcoef(ism.flatten(),desired.flatten())
# np.testing.assert_equal(ism,desired)
#
# corr=np.array(sp.spatial.distance.cdist(ism, desired, metric = 'correlation'))
#
assert False
def test_ndarray_to_vol():
import basc
import nibabel as nb
subject_file = home + '/git_repo/PyBASC/sample_data/sub1/Func_Quarter_Res.nii.gz'
subject_file = home + '/git_repo/PyBASC/sample_data/test.nii.gz'
data = nb.load(subject_file).get_data().astype('float32')
roi_mask_file= home + '/git_repo/PyBASC/masks/LC_Quarter_Res.nii.gz'
print( 'Data Loaded')
roi_mask_file_nb = nb.load(roi_mask_file)
roi_mask_nparray = nb.load(roi_mask_file).get_data().astype('float32').astype('bool')
roi1data = data[roi_mask_nparray]
data_array=roi1data
sample_file=subject_file
filename=home + '/git_repo/PyBASC/sample_data/ndarray_to_vol_test.nii.gz'
basc.ndarray_to_vol(data_array, roi_mask_file, roi_mask_file, filename)
def get_dataset(dataset_path='Data/Train_Data'):
# Getting all data from data path:
try:
X = np.load('Data/npy_train_data/X.npy')
Y = np.load('Data/npy_train_data/Y.npy')
except:
inputs_path = dataset_path+'/input'
images = listdir(inputs_path) # Geting images
X = []
Y = []
for img in images:
img_path = inputs_path+'/'+img
x_img = get_img(img_path).astype('float32').reshape(64, 64, 3)
x_img /= 255.
y_img = get_img(img_path.replace('input/', 'mask/mask_')).astype('float32').reshape(64, 64, 1)
y_img /= 255.
X.append(x_img)
Y.append(y_img)
X = np.array(X)
Y = np.array(Y)
# Create dateset:
if not os.path.exists('Data/npy_train_data/'):
os.makedirs('Data/npy_train_data/')
np.save('Data/npy_train_data/X.npy', X)
np.save('Data/npy_train_data/Y.npy', Y)
X, X_test, Y, Y_test = train_test_split(X, Y, test_size=0.1, random_state=42)
return X, X_test, Y, Y_test
def __init__(self,
saved_model=None,
train_folder=None,
feature=_feature.__func__):
"""
:param saved_model: optional saved train set and labels as .npz
:param train_folder: optional custom train data to process
:param feature: feature function - compatible with saved_model
"""
self.feature = feature
if train_folder is not None:
self.train_set, self.train_labels, self.model = \
self.create_model(train_folder)
else:
if cv2.__version__[0] == '2':
self.model = cv2.KNearest()
else:
self.model = cv2.ml.KNearest_create()
if saved_model is None:
saved_model = TRAIN_DATA+'raw_pixel_data.npz'
with np.load(saved_model) as data:
self.train_set = data['train_set']
self.train_labels = data['train_labels']
if cv2.__version__[0] == '2':
self.model.train(self.train_set, self.train_labels)
else:
self.model.train(self.train_set, cv2.ml.ROW_SAMPLE,
self.train_labels)
def load(self, model_filename):
self.__model = load_model("%s.model" % model_filename)
self.__chars = np.load("%s.cvocab.npy" % model_filename).tolist()
self.__trigrams = np.load("%s.tvocab.npy" % model_filename).tolist()
self.__classes = np.load("%s.classes.npy" % model_filename).tolist()
self.__char_indexes = dict((c, i) for i, c in enumerate(self.__chars))
self.__indexes_char = dict((i, c) for i, c in enumerate(self.__chars))
self.__trigrams_indexes = dict((t, i) for i, t in enumerate(self.__trigrams))
self.__indices_trigrams = dict((i, t) for i, t in enumerate(self.__trigrams))
self.__classes_indexes = dict((c, i) for i, c in enumerate(self.__classes))
self.__indexes_classes = dict((i, c) for i, c in enumerate(self.__classes))
def get_id_set(lang_codes):
feature_database = np.load("family_features.npz")
lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
all_languages = list(feature_database["langs"])
feature_names = [ "ID_" + l.upper() for l in all_languages ]
values = np.zeros((len(lang_codes), len(feature_names)))
for i, lang_code in enumerate(lang_codes):
feature_index = get_language_index(lang_code, feature_database)
values[i, feature_index] = 1.0
return feature_names, values
def unpickle(file):
import pickle
fo = open(file, 'rb')
dict = pickle.load(fo, encoding='latin1')
fo.close()
return dict
def load_pkl(path):
with open(path) as f:
obj = cPickle.load(f)
print(" [*] load %s" % path)
return obj
def load_npy(path):
obj = np.load(path)
print(" [*] load %s" % path)
return obj
def load(self, local_dir_=None):
'''
load dataset from local disk
Args:
local_dir_: string or None
if None, will use default Dataset.DEFAULT_DIR
'''
def load(self, local_dir_=None):
if local_dir_ is None:
local_dir = self.DEFAULT_DIR
else:
local_dir = Path(local_dir_)
data_di = np.load(str(local_dir/'cifar10.npz'))
self.datum[:] = data_di['images']
self.labels[:] = data_di['labels']
def install(
self, local_dst_dir_=None, local_src_dir_=None, clean_install_=False):
'''
Install the dataset into directly usable format,
requires downloading for public dataset.
Args:
local_dst_dir_: string or None
where to install the dataset, None -> "%(default_dir)s"
local_src_dir_: string or None
where to find the raw downloaded files, None -> "%(default_dir)s"
'''
local_dst_dir = self.DEFAULT_DIR if local_dst_dir_ is None else Path(local_dst_dir_)
local_src_dir = self.DEFAULT_DIR if local_src_dir_ is None else Path(local_src_dir_)
local_dst_dir.mkdir(parents=True, exist_ok=True)
assert local_src_dir.exists()
images = np.empty((60000,3,32,32), dtype=np.uint8)
labels = np.empty((60000,), dtype=np.uint8)
tarfile_name = str(local_src_dir / 'cifar-10-python.tar.gz')
with tarfile.open(tarfile_name, 'r:gz') as tf:
for i in range(5):
with tf.extractfile('cifar-10-batches-py/data_batch_%d'%(i+1)) as f:
data_di = pickle.load(f, encoding='bytes')
images[(10000*i):(10000*(i+1))] = data_di[b'data'].reshape((10000,3,32,32))
labels[(10000*i):(10000*(i+1))] = np.asarray(data_di[b'labels'], dtype=np.uint8)
with tf.extractfile('cifar-10-batches-py/test_batch') as f:
data_di = pickle.load(f, encoding='bytes')
images[50000:60000] = data_di[b'data'].reshape((10000,3,32,32))
labels[50000:60000] = data_di[b'labels']
np.savez_compressed(str(local_dst_dir / 'cifar10.npz'), images=images, labels=labels)
if clean_install_:
os.remove(tarfile_name)
def load(self, local_dir_=None):
if local_dir_ is None:
local_dir = self.DEFAULT_DIR
else:
local_dir = Path(local_dir_)
data = np.load(str(local_dir / 'mnist.npz'))
self.labels = data['labels']
self.datum = data['images']
self.label_map = np.arange(10)
self.imsize = (1,28,28)
def load(self, local_dir_=None):
# TODO
raise NotImplementedError()
def load_aggregate_masks_scans (masks_mnames, grids, upgrid_multis):
scans = []
masks = []
igrid = 0
for masks_names in masks_mnames:
if (len(masks_names) > 0):
grid = grids[igrid]
upgrid_multi = upgrid_multis[igrid]
upgcount = upgrid_multi * upgrid_multi
scans1 = []
masks1 = []
for masks_name in masks_names:
print ("Loading: ", masks_name)
masks0 = np.load(''.join((masks_name, ".npz")))['arr_0']
scans0 = np.load(''.join((masks_name.replace("masks_", "scans_", 1), ".npz")))['arr_0']
masks1.append(masks0)
scans1.append(scans0)
scans1 = np.vstack(scans1)
masks1 = np.vstack(masks1)
if len(masks) > 0:
scans1 = np.vstack([scans1, scans])
masks1 = np.vstack([masks1, masks])
lm = len(masks1) // upgcount * upgcount
scans1 = scans1[0:lm] # cut to multiples of upgcount
masks1 = masks1[0:lm]
index_shuf = np.arange(lm)
np.random.shuffle(index_shuf)
scans1 = scans1[index_shuf]
masks1 = masks1[index_shuf]
scans = data_from_grid_by_proximity(scans1, upgrid_multi, upgrid_multi, grid=grid)
masks = data_from_grid_by_proximity(masks1, upgrid_multi, upgrid_multi, grid=grid)
igrid += 1
return masks, scans
def load_aggregate_masks_scans (masks_mnames, grids, upgrid_multis):
scans = []
masks = []
igrid = 0
for masks_names in masks_mnames:
if (len(masks_names) > 0):
grid = grids[igrid]
upgrid_multi = upgrid_multis[igrid]
upgcount = upgrid_multi * upgrid_multi
scans1 = []
masks1 = []
for masks_name in masks_names:
print ("Loading: ", masks_name)
masks0 = np.load(''.join((masks_name, ".npz")))['arr_0']
scans0 = np.load(''.join((masks_name.replace("masks_", "scans_", 1), ".npz")))['arr_0']
masks1.append(masks0)
scans1.append(scans0)
scans1 = np.vstack(scans1)
masks1 = np.vstack(masks1)
if len(masks) > 0:
scans1 = np.vstack([scans1, scans])
masks1 = np.vstack([masks1, masks])
lm = len(masks1) // upgcount * upgcount
scans1 = scans1[0:lm] # cut to multiples of upgcount
masks1 = masks1[0:lm]
index_shuf = np.arange(lm)
np.random.shuffle(index_shuf)
scans1 = scans1[index_shuf]
masks1 = masks1[index_shuf]
scans = data_from_grid_by_proximity(scans1, upgrid_multi, upgrid_multi, grid=grid)
masks = data_from_grid_by_proximity(masks1, upgrid_multi, upgrid_multi, grid=grid)
igrid += 1
return masks, scans