def test_loadtxt_fields_subarrays(self):
# For ticket #1936
if sys.version_info[0] >= 3:
from io import StringIO
else:
from StringIO import StringIO
dt = [("a", 'u1', 2), ("b", 'u1', 2)]
x = np.loadtxt(StringIO("0 1 2 3"), dtype=dt)
assert_equal(x, np.array([((0, 1), (2, 3))], dtype=dt))
dt = [("a", [("a", 'u1', (1, 3)), ("b", 'u1')])]
x = np.loadtxt(StringIO("0 1 2 3"), dtype=dt)
assert_equal(x, np.array([(((0, 1, 2), 3),)], dtype=dt))
dt = [("a", 'u1', (2, 2))]
x = np.loadtxt(StringIO("0 1 2 3"), dtype=dt)
assert_equal(x, np.array([(((0, 1), (2, 3)),)], dtype=dt))
dt = [("a", 'u1', (2, 3, 2))]
x = np.loadtxt(StringIO("0 1 2 3 4 5 6 7 8 9 10 11"), dtype=dt)
data = [((((0, 1), (2, 3), (4, 5)), ((6, 7), (8, 9), (10, 11))),)]
assert_equal(x, np.array(data, dtype=dt))
python类loadtxt()的实例源码
def test_record(self):
c = TextIO()
c.write('1 2\n3 4')
c.seek(0)
x = np.loadtxt(c, dtype=[('x', np.int32), ('y', np.int32)])
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')])
assert_array_equal(x, a)
d = TextIO()
d.write('M 64.0 75.0\nF 25.0 60.0')
d.seek(0)
mydescriptor = {'names': ('gender', 'age', 'weight'),
'formats': ('S1', 'i4', 'f4')}
b = np.array([('M', 64.0, 75.0),
('F', 25.0, 60.0)], dtype=mydescriptor)
y = np.loadtxt(d, dtype=mydescriptor)
assert_array_equal(y, b)
def test_skiprows(self):
c = TextIO()
c.write('comment\n1,2,3,5\n')
c.seek(0)
x = np.loadtxt(c, dtype=int, delimiter=',',
skiprows=1)
a = np.array([1, 2, 3, 5], int)
assert_array_equal(x, a)
c = TextIO()
c.write('# comment\n1,2,3,5\n')
c.seek(0)
x = np.loadtxt(c, dtype=int, delimiter=',',
skiprows=1)
a = np.array([1, 2, 3, 5], int)
assert_array_equal(x, a)
def test_gzip_loadtxt():
# Thanks to another windows brokeness, we can't use
# NamedTemporaryFile: a file created from this function cannot be
# reopened by another open call. So we first put the gzipped string
# of the test reference array, write it to a securely opened file,
# which is then read from by the loadtxt function
s = BytesIO()
g = gzip.GzipFile(fileobj=s, mode='w')
g.write(b'1 2 3\n')
g.close()
s.seek(0)
with temppath(suffix='.gz') as name:
with open(name, 'wb') as f:
f.write(s.read())
res = np.loadtxt(name)
s.close()
assert_array_equal(res, [1, 2, 3])
def knn_masked_data(trX,trY,missing_data_dir, input_shape, k):
raw_im_data = np.loadtxt(join(script_dir,missing_data_dir,'index.txt'),delimiter=' ',dtype=str)
raw_mask_data = np.loadtxt(join(script_dir,missing_data_dir,'index_mask.txt'),delimiter=' ',dtype=str)
# Using 'brute' method since we only want to do one query per classifier
# so this will be quicker as it avoids overhead of creating a search tree
knn_m = KNeighborsClassifier(algorithm='brute',n_neighbors=k)
prob_Y_hat = np.zeros((raw_im_data.shape[0],int(np.max(trY)+1)))
total_images = raw_im_data.shape[0]
pbar = progressbar.ProgressBar(widgets=[progressbar.FormatLabel('\rProcessed %(value)d of %(max)d Images '), progressbar.Bar()], maxval=total_images, term_width=50).start()
for i in range(total_images):
mask_im=load_image(join(script_dir,missing_data_dir,raw_mask_data[i][0]), input_shape,1).reshape(np.prod(input_shape))
mask = np.logical_not(mask_im > eps) # since mask is 1 at missing locations
v_im=load_image(join(script_dir,missing_data_dir,raw_im_data[i][0]), input_shape, 255).reshape(np.prod(input_shape))
rep_mask = np.tile(mask,(trX.shape[0],1))
# Corrupt whole training set according to the current mask
corr_trX = np.multiply(trX, rep_mask)
knn_m.fit(corr_trX, trY)
prob_Y_hat[i,:] = knn_m.predict_proba(v_im.reshape(1,-1))
pbar.update(i)
pbar.finish()
return prob_Y_hat
def quiz19_20():
gamma_l = [32, 2, 0.125]
lamb_l = [0.001, 1, 1000]
data = np.loadtxt("hw2_lssvm_all.dat")
x_train = data[:400, :-1]
y_train = data[:400, -1].astype(int)
x_test = data[400:, :-1]
y_test = data[400:, -1].astype(int)
n = len(y_train)
print("gamma lamb e_in e_out")
for gamma in gamma_l:
for lamb in lamb_l:
w = np.array(KRG(x_train, y_train, gamma, lamb, n)).flatten()
e_in = err(x_train, y_train, (w, gamma, x_train))
e_out = err(x_test, y_test, (w, gamma, x_train))
print(gamma, " ", lamb, " ", e_in, " ", e_out)
# quiz19-20
def _fileToMatrix(file_name):
"""rudimentary method to read in data from a file"""
# TODO: np.loadtxt() might be an alternative
# try:
if 1 < 3:
lres = []
for line in open(file_name, 'r').readlines():
if len(line) > 0 and line[0] not in ('%', '#'):
lres.append(list(map(float, line.split())))
res = lres
while res != [] and res[0] == []: # remove further leading empty lines
del res[0]
return res
# except:
print('could not read file ' + file_name)
# ____________________________________________________________
# ____________________________________________________________
def __init__(self, dictionary=None, topic_data=None, topic_file=None, style=None):
if dictionary is None:
raise ValueError('no dictionary!')
if topic_data is not None:
topics = topic_data
elif topic_file is not None:
topics = np.loadtxt('%s' % topic_file)
else:
raise ValueError('no topic data!')
# sort topics
topics_sums = np.sum(topics, axis=1)
idx = np.argsort(topics_sums)[::-1]
self.data = topics[idx]
self.dictionary = dictionary
if style is None:
style = self.STYLE_GENSIM
self.style = style
def GetTransitTimes(file = 'ttv_kruse.dat'):
'''
'''
planet, _, time, dtime = np.loadtxt(os.path.join(TRAPPIST_DAT, file), unpack = True)
transit_times = [None for i in range(7)]
if file == 'ttv_kruse.dat':
for i in range(7):
inds = np.where(planet == i + 1)[0]
transit_times[i] = time[inds] + (2455000 - 2454833)
elif file == 'ttv_agol.dat':
for i in range(6):
inds = np.where(planet == i + 1)[0]
transit_times[i] = time[inds] + (2450000 - 2454833)
# Append a few extra for padding
pad = [transit_times[i][-1] + np.median(np.diff(transit_times[i])),
transit_times[i][-1] + 2 * np.median(np.diff(transit_times[i])),
transit_times[i][-1] + 3 * np.median(np.diff(transit_times[i]))]
transit_times[i] = np.append(transit_times[i], pad)
return PlanetProperty(transit_times)
def load_uci_german_credits(path, n_train):
if not os.path.isfile(path):
data_dir = os.path.dirname(path)
if not os.path.exists(os.path.dirname(path)):
os.makedirs(data_dir)
download_dataset('https://archive.ics.uci.edu/ml/'
'machine-learning-databases/statlog/'
'german/german.data-numeric', path)
n_dims = 24
data = np.loadtxt(path)
x_train = data[:n_train, :n_dims]
y_train = data[:n_train, n_dims] - 1
x_test = data[n_train:, :n_dims]
y_test = data[n_train:, n_dims] - 1
return x_train, y_train, x_test, y_test
def load_uci_boston_housing(path, dtype=np.float32):
if not os.path.isfile(path):
data_dir = os.path.dirname(path)
if not os.path.exists(os.path.dirname(path)):
os.makedirs(data_dir)
download_dataset('http://archive.ics.uci.edu/ml/'
'machine-learning-databases/housing/housing.data',
path)
data = np.loadtxt(path)
data = data.astype(dtype)
permutation = np.random.choice(np.arange(data.shape[0]),
data.shape[0], replace=False)
size_train = int(np.round(data.shape[0] * 0.8))
size_test = int(np.round(data.shape[0] * 0.9))
index_train = permutation[0: size_train]
index_test = permutation[size_train:size_test]
index_val = permutation[size_test:]
x_train, y_train = data[index_train, :-1], data[index_train, -1]
x_val, y_val = data[index_val, :-1], data[index_val, -1]
x_test, y_test = data[index_test, :-1], data[index_test, -1]
return x_train, y_train, x_val, y_val, x_test, y_test
def applyTexture(x, y, texture = texture_input):
text = imread(texture_input)
height,width = text.shape[:2]
xmin, ymin = amin(x),amin(y)
xmax, ymax = amax(x),amax(y)
scale = max(((xmax - xmin + 2)/height),((ymax - ymin + 2)/width))
text = imresize(text, scale)
# print text.shape[:2]
# print xmax - xmin +2, ymax - ymin+2
X = (x-xmin).astype(int)
Y = (y-ymin).astype(int)
val1 = color.rgb2lab((text[X, Y]/255.).reshape(len(X), 1, 3)).reshape(len(X), 3)
val2 = color.rgb2lab((im[x, y]/255.).reshape(len(x), 1, 3)).reshape(len(x), 3)
L, A, B = mean(val2[:,0]), mean(val2[:,1]), mean(val2[:,2])
val2[:, 0] = np.clip(val2[:, 0] - L + val1[:,0], 0, 100)
val2[:, 1] = np.clip(val2[:, 1] - A + val1[:,1], -127, 128)
val2[:, 2] = np.clip(val2[:, 2] - B + val1[:,2], -127, 128)
im[x,y] = color.lab2rgb(val2.reshape(len(x), 1, 3)).reshape(len(x), 3)*255
# points = np.loadtxt('nailpoint_5')
def read_data_tri(filename):
data = numpy.loadtxt(filename)
if len(data.shape) == 1:
data = numpy.array([data])
points = data[:, :2]
weights = data[:, 2]
# The reference triangle is (-1, -1), (1, -1), (-1, 1). Transform the
# points to barycentric coordinates.
points += 1.0
points *= 0.5
points = numpy.array([
points[:, 0],
points[:, 1],
1.0 - numpy.sum(points, axis=1)
]).T
return points, weights * 0.5
convert_to_timeseries.py 文件源码
项目:Python-Machine-Learning-Cookbook
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def convert_data_to_timeseries(input_file, column, verbose=False):
# Load the input file
data = np.loadtxt(input_file, delimiter=',')
# Extract the start and end dates
start_date = str(int(data[0,0])) + '-' + str(int(data[0,1]))
end_date = str(int(data[-1,0] + 1)) + '-' + str(int(data[-1,1] % 12 + 1))
if verbose:
print "\nStart date =", start_date
print "End date =", end_date
# Create a date sequence with monthly intervals
dates = pd.date_range(start_date, end_date, freq='M')
# Convert the data into time series data
data_timeseries = pd.Series(data[:,column], index=dates)
if verbose:
print "\nTime series data:\n", data_timeseries[:10]
return data_timeseries
def test_data_sizes(self):
"""Test that different number of bits give correct throughput size"""
for iterate in range(5):
nbit = 2**iterate
if nbit == 8:
continue
self.blocks[0] = (
SigprocReadBlock(
'./data/2chan' + str(nbit) + 'bitNoDM.fil'),
[], [0])
open(self.logfile, 'w').close()
Pipeline(self.blocks).main()
number_fftd = np.loadtxt(self.logfile).astype(np.float32).view(np.complex64).size
# Compare with simple copy
self.blocks[1] = (CopyBlock(), [0], [1])
open(self.logfile, 'w').close()
Pipeline(self.blocks).main()
number_copied = np.loadtxt(self.logfile).size
self.assertEqual(number_fftd, number_copied)
# Go back to FFT
self.blocks[1] = (FFTBlock(gulp_size=4096 * 8 * 8 * 8), [0], [1])
def test_equivalent_data_to_copy(self):
"""Test that the data coming out of this pipeline is equivalent
the initial read data"""
self.logfile = '.log.txt'
self.blocks = []
self.blocks.append((
SigprocReadBlock(
'./data/1chan8bitNoDM.fil'),
[], [0]))
self.blocks.append((FFTBlock(gulp_size=4096 * 8 * 8 * 8 * 8), [0], [1]))
self.blocks.append((IFFTBlock(gulp_size=4096 * 8 * 8 * 8 * 8), [1], [2]))
self.blocks.append((WriteAsciiBlock(self.logfile), [2], []))
open(self.logfile, 'w').close()
Pipeline(self.blocks).main()
unfft_result = np.loadtxt(self.logfile).astype(np.float32).view(np.complex64)
self.blocks[1] = (CopyBlock(), [0], [1])
self.blocks[2] = (WriteAsciiBlock(self.logfile), [1], [])
del self.blocks[3]
open(self.logfile, 'w').close()
Pipeline(self.blocks).main()
untouched_result = np.loadtxt(self.logfile).astype(np.float32)
np.testing.assert_almost_equal(unfft_result, untouched_result, 2)
def load_single_voxel_grid(self,path):
temp = re.split('_', path.split('.')[-2])
x_d = int(temp[len(temp) - 3])
y_d = int(temp[len(temp) - 2])
z_d = int(temp[len(temp) - 1])
a = np.loadtxt(path)
if len(a)<=0:
print " load_single_voxel_grid error: ", path
exit()
voxel_grid = np.zeros((x_d, y_d, z_d,1))
for i in a:
voxel_grid[int(i[0]), int(i[1]), int(i[2]),0] = 1 # occupied
#Data.plotFromVoxels(voxel_grid)
voxel_grid = self.voxel_grid_padding(voxel_grid)
return voxel_grid
def calculate_loss_distill(self, predictions, labels_distill, labels, **unused_params):
with tf.name_scope("loss_distill"):
print("loss_distill")
epsilon = 10e-6
float_labels = tf.cast(labels, tf.float32)
float_labels_distill = tf.cast(labels_distill, tf.float32)
embedding_mat = np.loadtxt("./resources/embedding_matrix.model")
vocab_size = embedding_mat.shape[1]
labels_size = float_labels.get_shape().as_list()[1]
embedding_mat = tf.cast(embedding_mat,dtype=tf.float32)
cross_entropy_loss_1 = float_labels * tf.log(predictions + epsilon) + (
1 - float_labels) * tf.log(1 - predictions + epsilon)
float_labels_1 = float_labels[:,:vocab_size]
labels_smooth = tf.matmul(float_labels_1,embedding_mat)/tf.reduce_sum(float_labels_1,axis=1,keep_dims=True)
float_classes = labels_smooth
for i in range(labels_size//vocab_size-1):
float_classes = tf.concat((float_classes,labels_smooth),axis=1)
cross_entropy_loss_2 = float_classes * tf.log(predictions + epsilon) + (
1 - float_classes) * tf.log(1 - predictions + epsilon)
cross_entropy_loss_3 = float_labels_distill * tf.log(predictions + epsilon) + (
1 - float_labels_distill) * tf.log(1 - predictions + epsilon)
cross_entropy_loss = cross_entropy_loss_1*0.5 + cross_entropy_loss_2*0.5 + cross_entropy_loss_3*0.5
cross_entropy_loss = tf.negative(cross_entropy_loss)
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
def calculate_loss_negative(self, predictions_pos, predictions_neg, labels, **unused_params):
with tf.name_scope("loss_negative"):
epsilon = 10e-6
float_labels = tf.cast(labels, tf.float32)
weight_pos = np.loadtxt(FLAGS.autoencoder_dir+"labels_uni.out")
weight_pos = tf.reshape(tf.cast(weight_pos,dtype=tf.float32),[1,-1])
weight_pos = tf.log(tf.reduce_max(weight_pos)/weight_pos)+1
cross_entropy_loss_1 = float_labels * tf.log(predictions_pos + epsilon)*weight_pos + (
1 - float_labels) * tf.log(1 - predictions_pos + epsilon)
cross_entropy_loss_2 = (1-float_labels) * tf.log(predictions_neg + epsilon) + \
float_labels * tf.log(1 - predictions_neg + epsilon)
cross_entropy_loss = tf.negative(cross_entropy_loss_1+cross_entropy_loss_2)
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
def calculate_loss_mix(self, predictions, predictions_class, labels, **unused_params):
with tf.name_scope("loss_mix"):
float_labels = tf.cast(labels, tf.float32)
if FLAGS.support_type=="class":
seq = np.loadtxt(FLAGS.class_file)
tf_seq = tf.one_hot(tf.constant(seq,dtype=tf.int32),FLAGS.encoder_size)
float_classes_org = tf.matmul(float_labels,tf_seq)
class_true = tf.ones(tf.shape(float_classes_org))
class_false = tf.zeros(tf.shape(float_classes_org))
float_classes = tf.where(tf.greater(float_classes_org, class_false), class_true, class_false)
cross_entropy_class = self.calculate_loss(predictions_class,float_classes)
elif FLAGS.support_type=="frequent":
float_classes = float_labels[:,0:FLAGS.encoder_size]
cross_entropy_class = self.calculate_loss(predictions_class,float_classes)
elif FLAGS.support_type=="encoder":
float_classes = float_labels
for i in range(FLAGS.encoder_layers):
var_i = np.loadtxt(FLAGS.autoencoder_dir+'autoencoder_layer%d.model' % i)
weight_i = tf.constant(var_i[:-1,:],dtype=tf.float32)
bias_i = tf.reshape(tf.constant(var_i[-1,:],dtype=tf.float32),[-1])
float_classes = tf.nn.xw_plus_b(float_classes,weight_i,bias_i)
if i<FLAGS.encoder_layers-1:
float_classes = tf.nn.relu(float_classes)
else:
float_classes = tf.nn.sigmoid(float_classes)
#float_classes = tf.nn.relu(tf.sign(float_classes - 0.5))
cross_entropy_class = self.calculate_mseloss(predictions_class,float_classes)
else:
float_classes = float_labels
for i in range(FLAGS.moe_layers-1):
float_classes = tf.concat((float_classes,float_labels),axis=1)
cross_entropy_class = self.calculate_loss(predictions_class,float_classes)
cross_entropy_loss = self.calculate_loss(predictions,labels)
return cross_entropy_loss + 0.1*cross_entropy_class