def generate_patch_probs(path, patch_locations, patch_size, im_size):
x, y, z = patch_locations
seg = nib.load(glob.glob(os.path.join(path, '*_seg.nii.gz'))[0]).get_data().astype(np.float32)
p = []
for i in range(len(x)):
for j in range(len(y)):
for k in range(len(z)):
patch = seg[int(x[i] - patch_size / 2) : int(x[i] + patch_size / 2),
int(y[j] - patch_size / 2) : int(y[j] + patch_size / 2),
int(z[k] - patch_size / 2) : int(z[k] + patch_size / 2)]
patch = (patch > 0).astype(np.float32)
percent = np.sum(patch) / (patch_size * patch_size * patch_size)
p.append((1 - np.abs(percent - 0.5)) * percent)
p = np.asarray(p, dtype=np.float32)
p[p == 0] = np.amin(p[np.nonzero(p)])
p = p / np.sum(p)
return p
python类load()的实例源码
resnet_regressor.py 文件源码
项目:Brain_Tumor_Segmentation
作者: KarthikRevanuru
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def train_xgboost():
df = pd.read_csv('survival_data.csv', index_col=0, encoding = 'UTF-7')
p = np.array([np.mean(np.load('training/%s_flair.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
q = np.array([np.mean(np.load('training/%s_t1.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
r = np.array([np.mean(np.load('training/%s_t1ce.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
s = np.array([np.mean(np.load('training/%s_t2.nii.gz.npy' % str(id)), axis=0) for id in folder_names_train])
y=np.array([])
t=0
z=np.array([])
for ind in range(len(folder_names_train)):
try:
temp = df.get_value(str(folder_names_train[ind]),'Survival')
y=np.append(y,temp)
temp = df.get_value(str(folder_names_train[ind]),'Age')
z=np.append(z,np.array([temp]))
except Exception as e:
t+=1
print (t,str(e),"Label Not found, deleting entry")
y=np.append(y,0)
z=np.array([[v] for v in z])
t=np.concatenate((p,q),axis=1)
u=np.concatenate((r,s),axis=1)
x=np.concatenate((t,u),axis=1)
#print(x.shape)
#print (x)
#print (x.shape,z.shape)
x=np.concatenate((x,z),axis=1)
#print (x)
#clf=linear_model.LogisticRegression(C=1e5)
#clf = RandomForestRegressor()
clf = xgb.XGBRegressor()
clf.fit(x,y)
return clf
def load_nifti(filename, with_affine=False):
"""
load image from NIFTI file
Parameters
----------
filename : str
filename of NIFTI file
with_affine : bool
if True, returns affine parameters
Returns
-------
data : np.ndarray
image data
"""
img = nib.load(filename)
data = img.get_data()
data = np.copy(data, order="C")
if with_affine:
return data, img.affine
return data
def load_channels(self,normalize = False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
channels = np.zeros( modalities[0].shape + (2,), dtype=np.float32)
for index_mod, mod in enumerate(modalities):
if self.data_augmentation:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj))
else:
channels[:,:,:,index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() )
return channels
def load_channels(self,normalize = False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
channels = np.zeros( modalities[0].shape + (2,), dtype=np.float32)
for index_mod, mod in enumerate(modalities):
if self.data_augmentation:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj))
else:
channels[:,:,:,index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() )
return channels
def load_ROI_mask(self):
proxy = nib.load(self.FLAIR_FILE)
image_array = np.asarray(proxy.dataobj)
mask = np.ones_like(image_array)
mask[np.where(image_array < 90)] = 0
# img = nib.Nifti1Image(mask, proxy.affine)
# nib.save(img, join(modalities_path,'mask.nii.gz'))
struct_element_size = (20, 20, 20)
mask_augmented = np.pad(mask, [(21, 21), (21, 21), (21, 21)], 'constant', constant_values=(0, 0))
mask_augmented = binary_closing(mask_augmented, structure=np.ones(struct_element_size, dtype=bool)).astype(
np.int)
return mask_augmented[21:-21, 21:-21, 21:-21].astype('bool')
def get_subject_shape(self):
if self.booleanLabels:
proxy = nib.load(self.GT_FILE)
elif self.booleanT1:
proxy = nib.load(self.T1_FILE)
elif self.booleanT1c:
proxy = nib.load(self.T1c_FILE)
elif self.booleanT2:
proxy = nib.load(self.T2_FILE)
elif self.booleanFLAIR:
proxy = nib.load(self.FLAIR_FILE)
else:
raise ValueError("[src.helpers.Subject] No modalities are given for subject: " + self.id)
return proxy.shape
def load_channels(self):
flair = nib.load(self.FLAIR_FILE)
t1 = nib.load(self.T1_FILE)
t1c = nib.load(self.T1c_FILE)
t2 = nib.load(self.T2_FILE)
to_int = lambda b: 1 if b else 0
num_input_modalities = to_int(self.booleanFLAIR) + to_int(self.booleanT1) + to_int(self.booleanT1c) + to_int(
self.booleanT2)
channels = np.zeros((num_input_modalities,) + flair.shape, dtype=np.float32)
channels[0] = np.asarray(flair.dataobj) if self.booleanFLAIR is True else None
channels[to_int(self.booleanFLAIR)] = np.asarray(t1.dataobj) if self.booleanT1 is True else None
channels[to_int(self.booleanFLAIR) + to_int(self.booleanT1)] = np.asarray(
t1c.dataobj) if self.booleanT1c is True else None
channels[to_int(self.booleanFLAIR) + to_int(self.booleanT1) + to_int(self.booleanT1c)] = np.asarray(
t2.dataobj) if self.booleanT2 is True else None
return channels
def load_channels(self, normalize=False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
modalities.append(nib.load(self.T1c_FILE))
modalities.append(nib.load(self.T2_FILE))
channels = np.zeros(modalities[0].shape + (4,), dtype=np.float32)
if normalize:
mask = self.load_ROI_mask()
for index_mod, mod in enumerate(modalities):
if self.data_augmentation_flag:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation)
else:
channels[:, :, :, index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:, :, :, index_mod], mask=mask)
return channels
def load_channels(self, normalize=False):
modalities = []
modalities.append(nib.load(self.FLAIR_FILE))
modalities.append(nib.load(self.T1_FILE))
modalities.append(nib.load(self.T1c_FILE))
modalities.append(nib.load(self.T2_FILE))
channels = np.zeros(modalities[0].shape + (4,), dtype=np.float32)
if normalize:
mask = self.load_ROI_mask()
for index_mod, mod in enumerate(modalities):
if self.data_augmentation_flag:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation)
else:
channels[:, :, :, index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:, :, :, index_mod], mask=mask)
return channels
def load_channels(self, normalize=False):
modalities = []
modalities.append(nib.load(self.T2_FILE))
modalities.append(nib.load(self.T1_FILE))
channels = np.zeros(modalities[0].shape + (2,), dtype=np.float32)
for index_mod, mod in enumerate(modalities):
if self.data_augmentation:
channels[:, :, :, index_mod] = flip_plane(np.asarray(mod.dataobj), plane=self.data_augmentation)
else:
channels[:,:,:,index_mod] = np.asarray(mod.dataobj)
if normalize:
channels[:, :, :, index_mod] = normalize_image(channels[:,:,:,index_mod], mask = self.load_ROI_mask() )
return channels
def test_individual_stability_matrix():
"""
Tests individual_stability_matrix method on three gaussian blobs.
"""
import utils
import numpy as np
import scipy as sp
desired = np.load(home + '/git_repo/PyBASC/tests/ism_test.npy')
blobs = generate_blobs()
ism = utils.individual_stability_matrix(blobs, 20, 3)
#how to use test here?
# np.corrcoef(ism.flatten(),desired.flatten())
# np.testing.assert_equal(ism,desired)
#
# corr=np.array(sp.spatial.distance.cdist(ism, desired, metric = 'correlation'))
#
assert False
def test_ndarray_to_vol():
import basc
import nibabel as nb
subject_file = home + '/git_repo/PyBASC/sample_data/sub1/Func_Quarter_Res.nii.gz'
subject_file = home + '/git_repo/PyBASC/sample_data/test.nii.gz'
data = nb.load(subject_file).get_data().astype('float32')
roi_mask_file= home + '/git_repo/PyBASC/masks/LC_Quarter_Res.nii.gz'
print( 'Data Loaded')
roi_mask_file_nb = nb.load(roi_mask_file)
roi_mask_nparray = nb.load(roi_mask_file).get_data().astype('float32').astype('bool')
roi1data = data[roi_mask_nparray]
data_array=roi1data
sample_file=subject_file
filename=home + '/git_repo/PyBASC/sample_data/ndarray_to_vol_test.nii.gz'
basc.ndarray_to_vol(data_array, roi_mask_file, roi_mask_file, filename)
def analyse_data(input_dir):
shapes = []
relative_volumes = []
for folder in get_sub_folders(input_dir):
print(folder)
for sub_folder in get_sub_folders(os.path.join(input_dir, folder)):
image_type = get_image_type_from_folder_name(sub_folder)
# do not save the raw data (too heavy)
if image_type != '.OT':
continue
path = os.path.join(input_dir, folder, sub_folder)
filename = next(filename for filename in os.listdir(path) if get_extension(filename) == '.nii')
path = os.path.join(path, filename)
im = nib.load(path)
image = im.get_data()
shape = image.shape
shapes.append(shape)
relative_volumes.append(100 * np.sum(image) / np.cumprod(shape)[-1])
return shapes, relative_volumes
# train
def export_data(prediction_dir, nii_image_dir, tfrecords_dir, export_dir, transformation=None):
for file_path in os.listdir(prediction_dir):
name, prediction, probability = read_prediction_file(os.path.join(prediction_dir, file_path))
if transformation:
image, ground_truth = get_original_image(os.path.join(tfrecords_dir, name + '.tfrecord'), False)
prediction = transformation.transform_image(prediction, probability, image)
# build cv_predictions .nii image
img = nib.Nifti1Image(prediction, np.eye(4))
img.set_data_dtype(dtype=np.uint8)
path = os.path.join(nii_image_dir, name)
adc_name = next(l for l in os.listdir(path) if 'MR_ADC' in l)
export_image = nib.load(os.path.join(nii_image_dir, name, adc_name, adc_name + '.nii'))
i = export_image.get_data()
i[:] = img.get_data()
# set name to specification and export
_id = next(l for l in os.listdir(path) if 'MR_MTT' in l).split('.')[-1]
export_path = os.path.join(export_dir, 'SMIR.' + name + '.' + _id + '.nii')
nib.save(export_image, os.path.join(export_path))
def load(self, path):
""" A method that load the image data and associated metadata.
Parameters
----------
path: str
the path to the image to be loaded.
Return
------
image: Image
the loaded image.
"""
_image = nibabel.load(path)
return Image(spacing=_image.header.get_zooms(),
data_type="scalar",
metadata={"path": path},
data=_image.get_data())
def _get_investigation_template(bids_directory, mri_par_names):
this_path = os.path.realpath(
__file__[:-1] if __file__.endswith('.pyc') else __file__)
template_path = opj(
*(psplit(this_path)[:-1] + ("i_investigation_template.txt", )))
investigation_template = open(template_path).read()
title = psplit(bids_directory)[-1]
if exists(opj(bids_directory, "dataset_description.json")):
with open(opj(bids_directory, "dataset_description.json"), "r") \
as description_dict_fp:
description_dict = json.load(description_dict_fp)
if "Name" in description_dict:
title = description_dict["Name"]
investigation_template = investigation_template.replace(
"[TODO: TITLE]", title)
investigation_template = investigation_template.replace(
"[TODO: MRI_PAR_NAMES]", ";".join(mri_par_names))
return investigation_template
def get_all_nifti_data(directory, map_names=None, deferred=True):
"""Get the data of all the nifti volumes in the given directory.
If map_names is given we will only load the given map names. Else, we load all .nii and .nii.gz files in the
given directory.
Args:
directory (str): the directory from which we want to read a number of maps
map_names (list of str): the names of the maps we want to use. If given, we only use and return these maps.
deferred (boolean): if True we return an deferred loading dictionary instead of a dictionary with the values
loaded as arrays.
Returns:
dict: A dictionary with the volumes. The keys of the dictionary are the filenames
without the extension of the .nii(.gz) files in the given directory.
"""
proxies = load_all_niftis(directory, map_names=map_names)
if deferred:
return DeferredActionDict(lambda _, item: item.get_data(), proxies)
else:
return {k: v.get_data() for k, v in proxies.items()}
def get_cut_size(data_path):
list_D_s, list_D_e, list_H_s, list_H_e, list_W_s, list_W_e = [], [], [], [], [], []
for i in range(1, 10+1):
subject_name = 'subject-%d-' % i
f = os.path.join(data_path, subject_name+'T1.hdr')
img = nib.load(f)
inputs_tmp = img.get_data()
D_s, D_e, H_s, H_e, W_s, W_e = cut_edge(inputs_tmp, margin) # set how many zero margins to keep
list_D_s.append(D_s)
list_D_e.append(D_e)
list_H_s.append(H_s)
list_H_e.append(H_e)
list_W_s.append(W_s)
list_W_e.append(W_e)
min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e = \
min(list_D_s), max(list_D_e), min(list_H_s), max(list_H_e), min(list_W_s), max(list_W_e)
print("new size: ", min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e)
return min_D_s, max_D_e, min_H_s, max_H_e, min_W_s, max_W_e
def check_atlas(atlas):
"""Validation of atlas input."""
# when its a name for pre-defined atlas
if isinstance(atlas, str):
if not pexists(atlas): # just a name
atlas = atlas.lower()
if atlas not in parcellate.atlas_list:
raise ValueError('Invalid choice of atlas. Accepted : {}'.format(parcellate.atlas_list))
elif os.path.isdir(atlas): # cortical atlas in Freesurfer org
if not parcellate.check_atlas_annot_exist(atlas):
raise ValueError('Given atlas folder does not contain Freesurfer label annot files. '
'Needed : given_atlas_dir/label/?h.aparc.annot')
elif pexists(atlas): # may be a volumetric atlas?
try:
atlas = nibabel.load(atlas)
except:
traceback.print_exc()
raise ValueError('Unable to read the provided image volume. Must be a nifti 2d volume, readable by nibabel.')
else:
raise ValueError('Unable to decipher or use the given atlas.')
else:
raise NotImplementedError('Atlas must be a string, providing a name or path to Freesurfer folder or a 3D nifti volume.')
return atlas
def _add_provenance(in_file, settings, air_msk, rot_msk):
from mriqc import __version__ as version
from niworkflows.nipype.utils.filemanip import hash_infile
import nibabel as nb
import numpy as np
air_msk_size = nb.load(air_msk).get_data().astype(
np.uint8).sum()
rot_msk_size = nb.load(rot_msk).get_data().astype(
np.uint8).sum()
out_prov = {
'md5sum': hash_infile(in_file),
'version': version,
'software': 'mriqc',
'warnings': {
'small_air_mask': bool(air_msk_size < 5e5),
'large_rot_frame': bool(rot_msk_size > 500),
}
}
if settings:
out_prov['settings'] = settings
return out_prov
def _binarize(in_file, threshold=0.5, out_file=None):
import os.path as op
import numpy as np
import nibabel as nb
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == '.gz':
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath('{}_bin{}'.format(fname, ext))
nii = nb.load(in_file)
data = nii.get_data()
data[data <= threshold] = 0
data[data > 0] = 1
hdr = nii.header.copy()
hdr.set_data_dtype(np.uint8)
nb.Nifti1Image(data.astype(np.uint8), nii.affine, hdr).to_filename(
out_file)
return out_file
def image_gradient(in_file, snr, out_file=None):
"""Computes the magnitude gradient of an image using numpy"""
import os.path as op
import numpy as np
import nibabel as nb
from scipy.ndimage import gaussian_gradient_magnitude as gradient
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == '.gz':
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath('{}_grad{}'.format(fname, ext))
imnii = nb.load(in_file)
data = imnii.get_data().astype(np.float32) # pylint: disable=no-member
datamax = np.percentile(data.reshape(-1), 99.5)
data *= 100 / datamax
grad = gradient(data, 3.0)
gradmax = np.percentile(grad.reshape(-1), 99.5)
grad *= 100.
grad /= gradmax
nb.Nifti1Image(grad, imnii.get_affine(), imnii.get_header()).to_filename(out_file)
return out_file
def thresh_image(in_file, thres=0.5, out_file=None):
"""Thresholds an image"""
import os.path as op
import nibabel as nb
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == '.gz':
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath('{}_thresh{}'.format(fname, ext))
im = nb.load(in_file)
data = im.get_data()
data[data < thres] = 0
data[data > 0] = 1
nb.Nifti1Image(
data, im.get_affine(), im.get_header()).to_filename(out_file)
return out_file
def __init__(self, func, mask, seg=None, tr=None,
title=None, figsize=DINA4_LANDSCAPE):
func_nii = nb.load(func)
self.func_data = func_nii.get_data()
self.mask_data = nb.load(mask).get_data()
self.ntsteps = self.func_data.shape[-1]
self.tr = tr
if tr is None:
self.tr = func_nii.get_header().get_zooms()[-1]
if seg is None:
self.seg_data = 2 * self.mask_data
else:
self.seg_data = nb.load(seg).get_data()
self.fig = plt.figure(figsize=figsize)
if title is not None:
self.fig.suptitle(title, fontsize=20)
self.confounds = []
self.spikes = []
def _get_limits(nifti_file, only_plot_noise=False):
from builtins import bytes, str # pylint: disable=W0622
if isinstance(nifti_file, (str, bytes)):
nii = nb.as_closest_canonical(nb.load(nifti_file))
data = nii.get_data()
else:
data = nifti_file
data_mask = np.logical_not(np.isnan(data))
if only_plot_noise:
data_mask = np.logical_and(data_mask, data != 0)
vmin = np.percentile(data[data_mask], 0)
vmax = np.percentile(data[data_mask], 61)
else:
vmin = np.percentile(data[data_mask], 0.5)
vmax = np.percentile(data[data_mask], 99.5)
return vmin, vmax
def symmetrise_axial(self, filename_in, filename_out=None, axis='x', plane_intercept=10,
side_to_copy='below', keep_in_data_dimensions=True):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im_segm = nib.load(pfi_in)
data_labels = im_segm.get_data()
data_symmetrised = symmetrise_data(data_labels,
axis=axis,
plane_intercept=plane_intercept,
side_to_copy=side_to_copy,
keep_in_data_dimensions=keep_in_data_dimensions)
im_symmetrised = set_new_data(im_segm, data_symmetrised)
nib.save(im_symmetrised, pfi_out)
print('Symmetrised axis {0}, plane_intercept {1}, image of {2} saved in {3}.'.format(axis, plane_intercept, pfi_in, pfi_out))
return pfi_out
def modify_affine(self, filename_in, affine_in, filename_out, q_form=True, s_form=True,
multiplication_side='left'):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
if isinstance(affine_in, str):
if affine_in.endswith('.txt'):
aff = np.loadtxt(connect_path_tail_head(self.pfo_in, affine_in))
else:
aff = np.load(connect_path_tail_head(self.pfo_in, affine_in))
elif isinstance(affine_in, np.ndarray):
aff = affine_in
else:
raise IOError('parameter affine_in can be path to an affine matrix .txt or .npy or the numpy array'
'corresponding to the affine transformation.')
im = nib.load(pfi_in)
new_im = modify_affine_transformation(im, aff, q_form=q_form, s_form=s_form,
multiplication_side=multiplication_side)
nib.save(new_im, pfi_out)
def apply_small_rotation(self, filename_in, filename_out, angle=np.pi/6, principal_axis='pitch'):
if isinstance(angle, list):
assert isinstance(principal_axis, list)
assert len(principal_axis) == len(angle)
new_aff = np.identity(4)
for pa, an in zip(principal_axis, angle):
aff = get_small_orthogonal_rotation(theta=an, principal_axis=pa)
new_aff = new_aff.dot(aff)
else:
new_aff = get_small_orthogonal_rotation(theta=angle, principal_axis=principal_axis)
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im = nib.load(pfi_in)
new_im = modify_affine_transformation(im_input=im, new_aff=new_aff, q_form=True, s_form=True,
multiplication_side='right')
nib.save(new_im, pfi_out)
def modify_translational_part(self, filename_in, filename_out, new_translation):
pfi_in, pfi_out = get_pfi_in_pfi_out(filename_in, filename_out, self.pfo_in, self.pfo_out)
im = nib.load(pfi_in)
if isinstance(new_translation, str):
if new_translation.endswith('.txt'):
tr = np.loadtxt(connect_path_tail_head(self.pfo_in, new_translation))
else:
tr = np.load(connect_path_tail_head(self.pfo_in, new_translation))
elif isinstance(new_translation, np.ndarray):
tr = new_translation
elif isinstance(new_translation, list):
tr = np.array(new_translation)
else:
raise IOError('parameter new_translation can be path to an affine matrix .txt or .npy or the numpy array'
'corresponding to the new intended translational part.')
new_im = replace_translational_part(im, tr)
nib.save(new_im, pfi_out)