def test_covariance_distance_range():
factor = 10
m1 = np.random.randint(3, size=[20, 20, 20])
im1 = nib.Nifti1Image(m1, np.eye(4))
for _ in range(20):
m2 = np.random.randint(3, size=[20, 20, 20])
im2 = nib.Nifti1Image(m2, np.eye(4))
cd = covariance_distance(im1, im2, [1, 2], ['label1', 'label2'], factor=factor)
print m1
print m2
print cd
assert 0 <= cd['label1'] <= factor
assert 0 <= cd['label2'] <= factor
python类Nifti1Image()的实例源码
def test_set_new_header_description():
arr_data = np.zeros([10,10,10])
im = nib.Nifti1Image(arr_data, np.eye(4))
hd = im.header
hd['descrip'] = 'Old Spam'
im_new_header = set_new_header_description(im, new_header_description='New Spam')
new_hd = im_new_header.header
assert new_hd['descrip'] == 'New Spam'
def set_new_data(image, new_data, new_dtype=None, remove_nan=True):
"""
From a nibabel image and a numpy array it creates a new image with
the same header of the image and the new_data as its data.
:param image: nibabel image
:param new_data:
:param new_dtype: numpy array
:param remove_nan:
:return: nibabel image
"""
hd = image.header
if remove_nan:
new_data = np.nan_to_num(new_data)
# update data type:
if new_dtype is not None:
new_data = new_data.astype(new_dtype)
image.set_data_dtype(new_dtype)
# if nifty1
if hd['sizeof_hdr'] == 348:
new_image = nib.Nifti1Image(new_data, image.affine, header=hd)
# if nifty2
elif hd['sizeof_hdr'] == 540:
new_image = nib.Nifti2Image(new_data, image.affine, header=hd)
else:
raise IOError('Input image header problem')
return new_image
def save_as_nii(y, savename):
y_nii = nib.Nifti1Image(y.astype(np.uint8), np.eye(4))
nib.save(y_nii, savename + '.nii')
def test_scan(model, test_x_data, options, save_nifti= True, candidate_mask = None):
"""
Test data based on one model
Input:
- test_x_data: a nested dictionary containing training image paths:
train_x_data['scan_name']['modality'] = path_to_image_modality
- save_nifti: save image segmentation
- candidate_mask: a binary masks containing voxels to classify
Output:
- test_scan = Output image containing the probability output segmetnation
- If save_nifti --> Saves a nifti file at specified location options['test_folder']/['test_scan']
"""
# get_scan name and create an empty nifti image to store segmentation
scans = test_x_data.keys()
flair_scans = [test_x_data[s]['FLAIR'] for s in scans]
flair_image = load_nii(flair_scans[0])
seg_image = np.zeros_like(flair_image.get_data())
# compute lesion segmentation in batches of size options['batch_size']
for batch, centers in load_test_patches(test_x_data, options['patch_size'], options['batch_size'], candidate_mask):
y_pred = model.predict_proba(np.squeeze(batch))
[x, y, z] = np.stack(centers, axis=1)
seg_image[x, y, z] = y_pred[:, 1]
if save_nifti:
out_scan = nib.Nifti1Image(seg_image, affine=flair_image.affine)
out_scan.to_filename(os.path.join(options['test_folder'], options['test_scan'], options['experiment'], options['test_name']))
#out_scan.to_filename(os.path.join(test_folder, scan, options['experiment'], options['test_name']))
return seg_image
def post_process_segmentation(input_scan, options, save_nifti = True):
"""
Post-process the probabilistic segmentation using parameters t_bin and l_min
t_bin: threshold to binarize the output segmentations
l_min: minimum lesion volume
Inputs:
- input_scan: probabilistic input image (segmentation)
- options dictionary
- save_nifti: save the result as nifti
Output:
- output_scan: final binarized segmentation
"""
from scipy import ndimage
t_bin = options['t_bin']
l_min = options['l_min']
output_scan = np.zeros_like(input_scan)
# threshold input segmentation
t_segmentation = input_scan >= t_bin
# filter candidates by size and store those > l_min
labels, num_labels = ndimage.label(t_segmentation)
label_list = np.unique(labels)
num_elements_by_lesion = ndimage.labeled_comprehension(t_segmentation, labels, label_list, np.sum, float, 0)
for l in range(len(num_elements_by_lesion)):
if num_elements_by_lesion[l]>l_min:
# assign voxels to output
current_voxels = np.stack(np.where(labels == l), axis =1)
output_scan[current_voxels[:,0], current_voxels[:,1], current_voxels[:,2]] = 1
#save the output segmentation as Nifti1Image
if save_nifti:
nifti_out = nib.Nifti1Image(output_scan, np.eye(4))
nifti_out.to_filename(os.path.join(options['test_folder'], options['test_scan'], options['experiment'], options['test_name']))
return output_scan
def test_scan(model, test_x_data, options, save_nifti= True, candidate_mask = None):
"""
Test data based on one model
Input:
- test_x_data: a nested dictionary containing training image paths:
train_x_data['scan_name']['modality'] = path_to_image_modality
- save_nifti: save image segmentation
- candidate_mask: a binary masks containing voxels to classify
Output:
- test_scan = Output image containing the probability output segmetnation
- If save_nifti --> Saves a nifti file at specified location options['test_folder']/['test_scan']
"""
# get_scan name and create an empty nifti image to store segmentation
scans = test_x_data.keys()
flair_scans = [test_x_data[s]['FLAIR'] for s in scans]
flair_image = load_nii(flair_scans[0]).get_data()
seg_image = np.zeros_like(flair_image)
# compute lesion segmentation in batches of size options['batch_size']
for batch, centers in load_test_patches(test_x_data, options['patch_size'], options['batch_size'], candidate_mask):
y_pred = model.predict_proba(np.squeeze(batch))
[x, y, z] = np.stack(centers, axis=1)
seg_image[x, y, z] = y_pred[:, 1]
if save_nifti:
out_scan = nib.Nifti1Image(seg_image, np.eye(4))
out_scan.to_filename(os.path.join(options['test_folder'], options['test_scan'], options['experiment'], options['test_name']))
return seg_image
def post_process_segmentation(input_scan, options, save_nifti = True):
"""
Post-process the probabilistic segmentation using parameters t_bin and l_min
t_bin: threshold to binarize the output segmentations
l_min: minimum lesion volume
Inputs:
- input_scan: probabilistic input image (segmentation)
- options dictionary
- save_nifti: save the result as nifti
Output:
- output_scan: final binarized segmentation
"""
from scipy import ndimage
t_bin = options['t_bin']
l_min = options['l_min']
output_scan = np.zeros_like(input_scan)
# threshold input segmentation
t_segmentation = input_scan >= t_bin
# filter candidates by size and store those > l_min
labels, num_labels = ndimage.label(t_segmentation)
label_list = np.unique(labels)
num_elements_by_lesion = ndimage.labeled_comprehension(t_segmentation, labels, label_list, np.sum, float, 0)
for l in range(len(num_elements_by_lesion)):
if num_elements_by_lesion[l]>l_min:
# assign voxels to output
current_voxels = np.stack(np.where(labels == l), axis =1)
output_scan[current_voxels[:,0], current_voxels[:,1], current_voxels[:,2]] = 1
#save the output segmentation as Nifti1Image
if save_nifti:
nifti_out = nib.Nifti1Image(output_scan, np.eye(4))
nifti_out.to_filename(os.path.join(options['test_folder'], options['test_scan'], options['experiment'], options['test_name']))
return output_scan
def save_nifti(x, output, client):
filename = x[0].split('/')[-1]
im = nib.Nifti1Image(x[1][1], x[1][0])
nib.save(im, filename)
client.upload(output,filename, overwrite=True)
return (x[0], 0)
def get_data(x):
fh = nib.FileHolder(fileobj=GzipFile(fileobj=BytesIO(x[1])))
im = nib.Nifti1Image.from_file_map({'header': fh, 'image': fh})
return (x[0], (im.affine, im.get_data()))
def save_nifti(x):
filename = x[0].split('/')[-1]
im = nib.Nifti1Image(x[1][1], x[1][0])
nib.save(im, os.path.join('/run/user/1000', filename))
return filename
def get_data(x):
fh = nib.FileHolder(fileobj=GzipFile(fileobj=BytesIO(x[1])))
im = nib.Nifti1Image.from_file_map({'header': fh, 'image': fh})
return (x[0], (im.affine, im.get_data()))
def median(in_files):
"""Computes an average of the median of each realigned timeseries
Parameters
----------
in_files: one or more realigned Nifti 4D time series
Returns
-------
out_file: a 3D Nifti file
"""
average = None
for idx, filename in enumerate(filename_to_list(in_files)):
img = nb.load(filename)
data = np.median(img.get_data(), axis=3)
if average is None:
average = data
else:
average = average + data
median_img = nb.Nifti1Image(average / float(idx + 1), img.affine,
img.header)
filename = os.path.join(os.getcwd(), 'median.nii.gz')
median_img.to_filename(filename)
return filename
def bandpass_filter(files, lowpass_freq, highpass_freq, fs):
"""Bandpass filter the input files
Parameters
----------
files: list of 4d nifti files
lowpass_freq: cutoff frequency for the low pass filter (in Hz)
highpass_freq: cutoff frequency for the high pass filter (in Hz)
fs: sampling rate (in Hz)
"""
out_files = []
for filename in filename_to_list(files):
path, name, ext = split_filename(filename)
out_file = os.path.join(os.getcwd(), name + '_bp' + ext)
img = nb.load(filename)
timepoints = img.shape[-1]
F = np.zeros((timepoints))
lowidx = int(timepoints / 2) + 1
if lowpass_freq > 0:
lowidx = np.round(float(lowpass_freq) / fs * timepoints)
highidx = 0
if highpass_freq > 0:
highidx = np.round(float(highpass_freq) / fs * timepoints)
F[highidx:lowidx] = 1
F = ((F + F[::-1]) > 0).astype(int)
data = img.get_data()
if np.all(F == 1):
filtered_data = data
else:
filtered_data = np.real(np.fft.ifftn(np.fft.fftn(data) * F))
img_out = nb.Nifti1Image(filtered_data, img.affine, img.header)
img_out.to_filename(out_file)
out_files.append(out_file)
return list_to_filename(out_files)
def load_bval_bvec_dti(self, fbval, fbvec, dti_file, dti_file_out):
"""
Takes bval and bvec files and produces a structure in dipy format
**Positional Arguments:**
"""
# Load Data
img = nb.load(dti_file)
data = img.get_data()
bvals, bvecs = read_bvals_bvecs(fbval, fbvec)
# Get rid of spurrious scans
idx = np.where((bvecs[:, 0] == 100) & (bvecs[:, 1] == 100) &
(bvecs[:, 2] == 100))
bvecs = np.delete(bvecs, idx, axis=0)
bvals = np.delete(bvals, idx, axis=0)
data = np.delete(data, idx, axis=3)
# Save corrected DTI volume
dti_new = nb.Nifti1Image(data, affine=img.get_affine(),
header=img.get_header())
dti_new.update_header()
nb.save(dti_new, dti_file_out)
gtab = gradient_table(bvals, bvecs, atol=0.01)
print(gtab.info)
return gtab
def mask(in_file, mask_file, new_name):
import nibabel as nb
import os
in_nii = nb.load(in_file)
mask_nii = nb.load(mask_file)
data = in_nii.get_data()
data[mask_nii.get_data() == 0] = 0
new_nii = nb.Nifti1Image(data, in_nii.affine, in_nii.header)
new_nii.to_filename(new_name)
return os.path.abspath(new_name)
def _run_interface(self, runtime):
orig_file_nii = nb.load(self.inputs.in_file)
in_file_data = orig_file_nii.get_data()
# pad the data to avoid the mask estimation running into edge effects
in_file_data_padded = np.pad(in_file_data, (1, 1), 'constant',
constant_values=(0, 0))
padded_nii = nb.Nifti1Image(in_file_data_padded, orig_file_nii.affine,
orig_file_nii.header)
mask_nii = compute_epi_mask(padded_nii, exclude_zeros=True)
mask_data = mask_nii.get_data()
if isdefined(self.inputs.dilation):
mask_data = nd.morphology.binary_dilation(mask_data).astype(np.uint8)
# reverse image padding
mask_data = mask_data[1:-1, 1:-1, 1:-1]
# exclude zero and NaN voxels
mask_data[in_file_data == 0] = 0
mask_data[np.isnan(in_file_data)] = 0
better_mask = nb.Nifti1Image(mask_data, orig_file_nii.affine,
orig_file_nii.header)
better_mask.set_data_dtype(np.uint8)
better_mask.to_filename("mask_file.nii.gz")
self._mask_file = os.path.abspath("mask_file.nii.gz")
runtime.returncode = 0
return super(ComputeEPIMask, self)._run_interface(runtime)
microstruct_mapsNmetrics_ver1.py 文件源码
项目:pestilli-teaching-2017
作者: francopestilli
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def write_num_peaks(peaks, affine, out_name):
# also print out a volume for number peaks
# sum the values greater than 5 in the third dimension
img = nib.Nifti1Image(np.sum(peaks.peak_values > 0, 3), affine)
nib.save(img, out_name)
load_stanford_hardi.py 文件源码
项目:pestilli-teaching-2017
作者: francopestilli
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def main():
argParseObj = argparse.ArgumentParser(description="Load stanford hardi dataset")
# this is the whole dwi with all the volumes yo
argParseObj.add_argument('-dir', help="the path where you want"
"this stuff to be dumped",
type=str, nargs='?', required=True)
args = argParseObj.parse_args()
outputDir = args.dir
from dipy.data import read_stanford_labels
hardi_img, gtab, label_img = read_stanford_labels()
hardi_data = hardi_img.get_data()
hardi_write = nib.Nifti1Image(hardi_data, hardi_img.get_affine())
outName = ''.join([outputDir, 'stanfordHardi_dwi.nii.gz'])
nib.save(hardi_write, outName)
label_data = label_img.get_data()
label_write = nib.Nifti1Image(label_data, label_img.get_affine())
outName = ''.join([outputDir, 'stanfordHardi_fsLabels.nii.gz'])
nib.save(label_write, outName)
# from dipy.data import read_stanford_pve_maps
# csf_img , gm_img , wm_img = read_stanford_pve_maps()
from dipy.external import fsl
outName = ''.join([outputDir, 'stanfordHardi.'])
fsl.write_bvals_bvecs(gtab.bvals, gtab.bvecs, prefix=outName)
def _brain_mask(row, median_radius=4, numpass=4, autocrop=False,
vol_idx=None, dilate=None, force_recompute=False):
brain_mask_file = _get_fname(row, '_brain_mask.nii.gz')
if not op.exists(brain_mask_file) or force_recompute:
img = nib.load(row['dwi_file'])
data = img.get_data()
gtab = row['gtab']
mean_b0 = np.mean(data[..., ~gtab.b0s_mask], -1)
_, brain_mask = median_otsu(mean_b0, median_radius, numpass,
autocrop, dilate=dilate)
be_img = nib.Nifti1Image(brain_mask.astype(int),
img.affine)
nib.save(be_img, brain_mask_file)
return brain_mask_file