def check_subjects(subjects_info):
"Ensure subjects are provided and their data exist."
if isinstance(subjects_info, str):
if not pexists(subjects_info):
raise IOError('path to subject list does not exist: {}'.format(subjects_info))
subjects_list = np.genfromtxt(subjects_info, dtype=str)
elif isinstance(subjects_info, collections.Iterable):
if len(subjects_info) < 1:
raise ValueError('Empty subject list.')
subjects_list = subjects_info
else:
raise ValueError('Invalid value provided for subject list. \n '
'Must be a list of paths, or path to file containing list of paths, one for each subject.')
subject_id_list = np.atleast_1d(subjects_list)
num_subjects = subject_id_list.size
if num_subjects < 1:
raise ValueError('Input subject list is empty.')
num_digits_id_size = len(str(num_subjects))
max_id_width = max(map(len, subject_id_list))
return subject_id_list, num_subjects, max_id_width, num_digits_id_size
python类genfromtxt()的实例源码
def write_preprocessed_data(output_directory, cell_IDs, cell_stages, data, markers):
processed_data_path = path.join(output_directory, 'processed_data.tsv')
with open(processed_data_path, 'w') as f:
f.write('\t'.join(cell_IDs))
f.write('\n')
f.write('\t'.join(cell_stages))
f.write('\n')
np.savetxt(f, data.T, fmt = '%.6f', delimiter = '\t')
dataset = np.genfromtxt(processed_data_path, delimiter = '\t', dtype = str)
dataset = np.insert(dataset, 0, np.append(['Cell ID', 'Stage'],
markers), axis = 1)
with open(processed_data_path, 'w') as f:
np.savetxt(f, dataset, fmt = '%s', delimiter = '\t')
def test_skip_footer_with_invalid(self):
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
basestr = '1 1\n2 2\n3 3\n4 4\n5 \n6 \n7 \n'
# Footer too small to get rid of all invalid values
assert_raises(ValueError, np.genfromtxt,
TextIO(basestr), skip_footer=1)
# except ValueError:
# pass
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
a = np.genfromtxt(TextIO(basestr), skip_footer=3)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
basestr = '1 1\n2 \n3 3\n4 4\n5 \n6 6\n7 7\n'
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]]))
a = np.genfromtxt(
TextIO(basestr), skip_footer=3, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]]))
def test_commented_header(self):
# Check that names can be retrieved even if the line is commented out.
data = TextIO("""
#gender age weight
M 21 72.100000
F 35 58.330000
M 33 21.99
""")
# The # is part of the first name and should be deleted automatically.
test = np.genfromtxt(data, names=True, dtype=None)
ctrl = np.array([('M', 21, 72.1), ('F', 35, 58.33), ('M', 33, 21.99)],
dtype=[('gender', '|S1'), ('age', int), ('weight', float)])
assert_equal(test, ctrl)
# Ditto, but we should get rid of the first element
data = TextIO(b"""
# gender age weight
M 21 72.100000
F 35 58.330000
M 33 21.99
""")
test = np.genfromtxt(data, names=True, dtype=None)
assert_equal(test, ctrl)
def test_dtype_with_object(self):
# Test using an explicit dtype with an object
data = """ 1; 2001-01-01
2; 2002-01-31 """
ndtype = [('idx', int), ('code', np.object)]
func = lambda s: strptime(s.strip(), "%Y-%m-%d")
converters = {1: func}
test = np.genfromtxt(TextIO(data), delimiter=";", dtype=ndtype,
converters=converters)
control = np.array(
[(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))],
dtype=ndtype)
assert_equal(test, control)
ndtype = [('nest', [('idx', int), ('code', np.object)])]
try:
test = np.genfromtxt(TextIO(data), delimiter=";",
dtype=ndtype, converters=converters)
except NotImplementedError:
pass
else:
errmsg = "Nested dtype involving objects should be supported."
raise AssertionError(errmsg)
def test_replace_space(self):
# Test the 'replace_space' option
txt = "A.A, B (B), C:C\n1, 2, 3.14"
# Test default: replace ' ' by '_' and delete non-alphanum chars
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None)
ctrl_dtype = [("AA", int), ("B_B", int), ("CC", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
# Test: no replace, no delete
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None,
replace_space='', deletechars='')
ctrl_dtype = [("A.A", int), ("B (B)", int), ("C:C", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
# Test: no delete (spaces are replaced by _)
test = np.genfromtxt(TextIO(txt),
delimiter=",", names=True, dtype=None,
deletechars='')
ctrl_dtype = [("A.A", int), ("B_(B)", int), ("C:C", float)]
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype)
assert_equal(test, ctrl)
def test_names_with_usecols_bug1636(self):
# Make sure we pick up the right names w/ usecols
data = "A,B,C,D,E\n0,1,2,3,4\n0,1,2,3,4\n0,1,2,3,4"
ctrl_names = ("A", "C", "E")
test = np.genfromtxt(TextIO(data),
dtype=(int, int, int), delimiter=",",
usecols=(0, 2, 4), names=True)
assert_equal(test.dtype.names, ctrl_names)
#
test = np.genfromtxt(TextIO(data),
dtype=(int, int, int), delimiter=",",
usecols=("A", "C", "E"), names=True)
assert_equal(test.dtype.names, ctrl_names)
#
test = np.genfromtxt(TextIO(data),
dtype=int, delimiter=",",
usecols=("A", "C", "E"), names=True)
assert_equal(test.dtype.names, ctrl_names)
def test_gft_using_filename(self):
# Test that we can load data from a filename as well as a file
# object
tgt = np.arange(6).reshape((2, 3))
if sys.version_info[0] >= 3:
# python 3k is known to fail for '\r'
linesep = ('\n', '\r\n')
else:
linesep = ('\n', '\r\n', '\r')
for sep in linesep:
data = '0 1 2' + sep + '3 4 5'
with temppath() as name:
with open(name, 'w') as f:
f.write(data)
res = np.genfromtxt(name)
assert_array_equal(res, tgt)
def read_file_to_np(self, file_name):
datatype = [('time',np.float32), ('ax',np.int16), ('ay',np.int16), ('az',np.int16),
('gx',np.int16), ('gy',np.int16), ('gz',np.int16),
('mx',np.int16), ('my',np.int16), ('mz',np.int16),
('time_diff', np.float32)]
data = np.genfromtxt(file_name, dtype=datatype, delimiter="\t")
data['time'] = data['time']-data['time'][0]
a = np.diff(data['time'])
time_diff_array = np.insert(a, 0, 0)
data['time_diff'] = time_diff_array
# ?????
data['mx'] = data['mx'] * 1.18359375
data['my'] = data['my'] * 1.19140625
data['mz'] = data['mz'] * 1.14453125
return data
def get_overlapping_files(self, path, ra, dec, width):
"""
This function ...
:param path to the directory with the images
:param ra:
:param dec:
:param width:
:return:
"""
# Generate the meta and then overlap file
meta_path, overlap_path = self.generate_meta_and_overlap_file(path, ra, dec, width)
# Load the overlap table
overlap_files = np.genfromtxt(overlap_path, skip_header=3, usecols=[32], dtype="S500")
# Return the names of the overlapping images
return overlap_files
# -----------------------------------------------------------------
def get_overlapping_files(self, path, ra, dec, width):
"""
This function ...
:param path to the directory with the images
:param ra:
:param dec:
:param width:
:return:
"""
# Generate the meta and then overlap file
meta_path, overlap_path = self.generate_meta_and_overlap_file(path, ra, dec, width)
# Load the overlap table
overlap_files = np.genfromtxt(overlap_path, skip_header=3, usecols=[32], dtype="S500")
# Return the names of the overlapping images
return overlap_files
# -----------------------------------------------------------------
def read_gpl(self):
dtype = [('waves',float),]+[('spec%i'%(i+1),float) for i in range(len(self.age))]
self.sed = np.genfromtxt(self.workdir+self.csp_output+'.spec',dtype=dtype)
age3, Q = np.genfromtxt(self.workdir+self.csp_output+'.3color', usecols=(0,5), unpack=True)
age4, M = np.genfromtxt(self.workdir+self.csp_output+'.4color', usecols=(0,6), unpack=True)
for x,age in zip(self.sed.dtype.names[1:],self.age):
self.sed[x] = self.sed[x] * 3.839e33
self.sed[x][self.sed["waves"] < 912.] = self.sed[x][self.sed["waves"] < 912.] * self.lyc_esc
log_age = np.log10(age*1e9)
diff = abs(age3 - log_age)
self.Q[x] = Q[diff == min(diff)][0]
diff = abs(age4 - log_age)
self.M_unnorm[x] = M[diff == min(diff)][0]
def main(opts):
vertices = np.genfromtxt('points.dat', delimiter=' ', skip_header=1)
npoints, dim = vertices.shape
assert dim == 3
faces = np.genfromtxt('indices.dat', delimiter=' ') # Generated from alpha_shape
# Create the mesh
cube = mesh.Mesh(np.zeros(faces.shape[0], dtype=mesh.Mesh.dtype))
for i, f in enumerate(faces):
for j in range(3):
cube.vectors[i][j] = vertices[f[j],:]
# Write the mesh to file
cube.save(opts.new_file_name)
def read_array(filename):
''' Read array and convert to 2d np arrays '''
array = np.genfromtxt(filename, dtype=float)
if len(array.shape)==1:
array = array.reshape( -1, 1 )
return array
def file_to_array (filename, verbose=False):
''' Converts a file to a list of list of STRING
It differs from np.genfromtxt in that the number of columns doesn't need to be constant'''
data =[]
with open(filename, "r") as data_file:
if verbose: print ("Reading {}...".format(filename))
lines = data_file.readlines()
if verbose: print ("Converting {} to correct array...".format(filename))
data = [lines[i].strip().split() for i in range (len(lines))]
del lines #djajetic 11.11.2015 questionable
return data
def load_iris():
try:
# Load Iris dataset from the sklearn.datasets package
from sklearn import datasets
from sklearn import decomposition
# Load Dataset
iris = datasets.load_iris()
X = iris.data
y = iris.target
labels = iris.target_names
# Reduce components by Principal Component Analysis from sklearn
X = decomposition.PCA(n_components=3).fit_transform(X)
except ImportError:
# Load Iris dataset manually
path = os.path.join('data', 'iris', 'iris.data')
iris_data = np.genfromtxt(path, dtype='str', delimiter=',')
X = iris_data[:, :4].astype(dtype=float)
y = np.ndarray((X.shape[0],), dtype=int)
# Create target vector y and corresponding labels
labels, idx = [], 0
for i, label in enumerate(iris_data[:, 4]):
label = label.split('-')[1]
if label not in labels:
labels.append(label); idx += 1
y[i] = idx - 1
# Reduce components by implemented Principal Component Analysis
X = PCA(X, 3)[0]
return X, y, labels
def read_model_table(modelfile):
'''
This reads a downloaded TRILEGAL model file.
'''
infd = gzip.open(modelfile)
model = np.genfromtxt(infd,names=True)
infd.close()
return model
def test_stats2():
"""Test stats2 func from fluxpart.util"""
data = "7 8 4\n6 1 3\n10 6 6\n6 7 3\n8 2 4"
dtype = [('v0', int), ('v1', int), ('v2', int)]
arr = np.genfromtxt(io.BytesIO(data.encode()), dtype=dtype)
ans = stats2(arr)
npt.assert_allclose(ans.ave_v0, 37 / 5)
npt.assert_allclose(ans.ave_v1, 24 / 5)
npt.assert_allclose(ans.ave_v2, 4)
npt.assert_allclose(ans.var_v0, 14 / 5)
npt.assert_allclose(ans.var_v1, 97 / 10)
npt.assert_allclose(ans.var_v2, 3 / 2)
npt.assert_allclose(ans.cov_v0_v1, 3 / 5)
npt.assert_allclose(ans.cov_v0_v2, 2)
npt.assert_allclose(ans.cov_v1_v0, ans.cov_v0_v1)
npt.assert_allclose(ans.cov_v1_v2, 1)
npt.assert_allclose(ans.cov_v2_v0, ans.cov_v0_v2)
npt.assert_allclose(ans.cov_v2_v1, ans.cov_v1_v2)
data = "7 8 4\n6 1 3\n10 6 6\n6 7 3\n8 2 4"
dtype = [('v0', int), ('v1', int), ('v2', int)]
arr = np.genfromtxt(io.BytesIO(data.encode()), dtype=dtype)
ans = stats2(arr, names=('v0', 'v2'))
npt.assert_allclose(ans.ave_v0, 37 / 5)
npt.assert_allclose(ans.ave_v2, 4)
npt.assert_allclose(ans.var_v0, 14 / 5)
npt.assert_allclose(ans.var_v2, 3 / 2)
npt.assert_allclose(ans.cov_v0_v2, 2)
npt.assert_allclose(ans.cov_v2_v0, ans.cov_v0_v2)
assert not hasattr(ans, 'ave_v1')
assert not hasattr(ans, 'var_v1')
assert not hasattr(ans, 'cov_v0_v1')
assert not hasattr(ans, 'cov_v1_v0')
assert not hasattr(ans, 'cov_v1_v2')
assert not hasattr(ans, 'cov_v2_v1')
def merge_results(sol,files):
model = get_model_type(sol)
save_where = '/Batch results/'
working_path = getcwd().replace("\\", "/")+"/"
save_path = working_path+save_where
print("\nChecking for longest csv file")
lengths = []
for f in files:
to_merge_temp = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f)
headers_temp = np.genfromtxt(to_merge_temp, delimiter=",", dtype=str, skip_footer=1)
lengths.append(len(headers_temp))
to_merge_max = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(files[lengths.index(max(lengths))],sol.model,model,files[lengths.index(max(lengths))])
headers = np.genfromtxt(to_merge_max, delimiter=",", dtype=str, skip_footer=1)
print("\nMerging csv files")
if not path.exists(save_path):
makedirs(save_path)
# to_merge = working_path+"/Results/%s/INV_%s_%s.csv" %(files[0],model,files[0])
# headers = np.genfromtxt(to_merge, delimiter=",", dtype=str, skip_footer=1)
merged_inv_results = np.zeros((len(files), len(headers)))
merged_inv_results.fill(np.nan)
for i, f in enumerate(files):
to_add = np.loadtxt(working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f), delimiter=",", skiprows=1)
merged_inv_results[i][:to_add.shape[0]] = to_add
rows = np.array(files, dtype=str)[:, np.newaxis]
hd = ",".join(["ID"] + list(headers))
np.savetxt(save_path+"Merged_%s-%s_%s_TO_%s.csv" %(sol.model,model,files[0],files[-1]), np.hstack((rows, merged_inv_results)), delimiter=",", header=hd, fmt="%s")
print("Batch file successfully saved in:\n", save_path)
def merge_results(sol,files):
model = get_model_type(sol)
save_where = '/Batch results/'
working_path = getcwd().replace("\\", "/")+"/"
save_path = working_path+save_where
print("\nChecking for longest csv file")
lengths = []
for f in files:
to_merge_temp = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f)
headers_temp = np.genfromtxt(to_merge_temp, delimiter=",", dtype=str, skip_footer=1)
lengths.append(len(headers_temp))
to_merge_max = working_path+"/Results/%s/INV_%s-%s_%s.csv" %(files[lengths.index(max(lengths))],sol.model,model,files[lengths.index(max(lengths))])
headers = np.genfromtxt(to_merge_max, delimiter=",", dtype=str, skip_footer=1)
print("\nMerging csv files")
if not path.exists(save_path):
makedirs(save_path)
# to_merge = working_path+"/Results/%s/INV_%s_%s.csv" %(files[0],model,files[0])
# headers = np.genfromtxt(to_merge, delimiter=",", dtype=str, skip_footer=1)
merged_inv_results = np.zeros((len(files), len(headers)))
merged_inv_results.fill(np.nan)
for i, f in enumerate(files):
to_add = np.loadtxt(working_path+"/Results/%s/INV_%s-%s_%s.csv" %(f,sol.model,model,f), delimiter=",", skiprows=1)
merged_inv_results[i][:to_add.shape[0]] = to_add
rows = np.array(files, dtype=str)[:, np.newaxis]
hd = ",".join(["ID"] + list(headers))
np.savetxt(save_path+"Merged_%s-%s_%s_TO_%s.csv" %(sol.model,model,files[0],files[-1]), np.hstack((rows, merged_inv_results)), delimiter=",", header=hd, fmt="%s")
print("Batch file successfully saved in:\n", save_path)