def test_load_table_creates(self, con, not_a_table):
pd = pytest.importorskip("pandas")
import numpy as np
data = pd.DataFrame({
"boolean_": [True, False],
"smallint_cast": np.array([0, 1], dtype=np.int8),
"smallint_": np.array([0, 1], dtype=np.int16),
"int_": np.array([0, 1], dtype=np.int32),
"bigint_": np.array([0, 1], dtype=np.int64),
"float_": np.array([0, 1], dtype=np.float32),
"double_": np.array([0, 1], dtype=np.float64),
"varchar_": ["a", "b"],
"text_": ['a', 'b'],
"time_": [datetime.time(0, 11, 59), datetime.time(13)],
"timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
"date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
}, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
'date_'])
con.load_table(not_a_table, data, create=True)
python类int16()的实例源码
def wavWrite(y, fs, nbits, audioFile):
""" Write samples to WAV file
Args:
samples: (ndarray / 2D ndarray) (floating point) sample vector
mono: DIM: nSamples
stereo: DIM: nSamples x nChannels
fs: (int) Sample rate in Hz
nBits: (int) Number of bits
fnWAV: (string) WAV file name to write
"""
if nbits == 8:
intsamples = (y+1.0) * AudioIO.normFact['int' + str(nbits)]
fX = np.int8(intsamples)
elif nbits == 16:
intsamples = y * AudioIO.normFact['int' + str(nbits)]
fX = np.int16(intsamples)
elif nbits > 16:
fX = y
write(audioFile, fs, fX)
def interpret_header(self):
"""redefine variables from header dictionary"""
self.nifs = self.header['nifs']
self.nchans = self.header['nchans']
self.nbits = self.header['nbits']
signed = 'signed' in self.header and self.header['signed'] is True
if self.nbits >= 8:
if signed:
self.dtype = {8: np.int8,
16: np.int16,
32: np.float32,
64: np.float64}[self.nbits]
else:
self.dtype = {8: np.uint8,
16: np.uint16,
32: np.float32,
64: np.float64}[self.nbits]
else:
self.dtype = np.int8 if signed else np.uint8
def numpy2bifrost(dtype):
if dtype == np.int8: return _bf.BF_DTYPE_I8
elif dtype == np.int16: return _bf.BF_DTYPE_I16
elif dtype == np.int32: return _bf.BF_DTYPE_I32
elif dtype == np.uint8: return _bf.BF_DTYPE_U8
elif dtype == np.uint16: return _bf.BF_DTYPE_U16
elif dtype == np.uint32: return _bf.BF_DTYPE_U32
elif dtype == np.float16: return _bf.BF_DTYPE_F16
elif dtype == np.float32: return _bf.BF_DTYPE_F32
elif dtype == np.float64: return _bf.BF_DTYPE_F64
elif dtype == np.float128: return _bf.BF_DTYPE_F128
elif dtype == ci8: return _bf.BF_DTYPE_CI8
elif dtype == ci16: return _bf.BF_DTYPE_CI16
elif dtype == ci32: return _bf.BF_DTYPE_CI32
elif dtype == cf16: return _bf.BF_DTYPE_CF16
elif dtype == np.complex64: return _bf.BF_DTYPE_CF32
elif dtype == np.complex128: return _bf.BF_DTYPE_CF64
elif dtype == np.complex256: return _bf.BF_DTYPE_CF128
else: raise ValueError("Unsupported dtype: " + str(dtype))
def numpy2string(dtype):
if dtype == np.int8: return 'i8'
elif dtype == np.int16: return 'i16'
elif dtype == np.int32: return 'i32'
elif dtype == np.int64: return 'i64'
elif dtype == np.uint8: return 'u8'
elif dtype == np.uint16: return 'u16'
elif dtype == np.uint32: return 'u32'
elif dtype == np.uint64: return 'u64'
elif dtype == np.float16: return 'f16'
elif dtype == np.float32: return 'f32'
elif dtype == np.float64: return 'f64'
elif dtype == np.float128: return 'f128'
elif dtype == np.complex64: return 'cf32'
elif dtype == np.complex128: return 'cf64'
elif dtype == np.complex256: return 'cf128'
else: raise TypeError("Unsupported dtype: " + str(dtype))
def smooth(tile):
#first use this function to get mean and save it in an array
temp = import_all_year_data(tile)
####after get the mean value for all doy, I will run a bise gapfill first
print temp.size
##when using the single processing
#inputVI = pd.DataFrame(temp)
#VIsmoothed = inputVI.apply(VIsmooth, axis=0)
#VIsmoothed = VIsmoothed.as_matrix()
#VIsmoothed = parallelize_dataframe(temp)
##when using the multiprocessing
VIsmoothed = dataframeapply(temp)
VIsmoothed = VIsmoothed.reshape(VIsmoothed.size/2400/2400, 2400, 2400)
TILEdir = os.path.join(dirref, tile)
if not os.path.exists(TILEdir):
os.makedirs(TILEdir)
export_array (Rasters=np.int16(VIsmoothed), directory=TILEdir, \
prod='EVI.BISE.SG', tile=tile, index=range(1, 369, 8))
temp = None
inputVI = None
VIsmoothed = None
def update_wf_library(filename, pulses, offsets):
"""
Update a H5 waveform library in place give an iterable of (pulseName, pulse)
tuples and offsets into the waveform library.
"""
assert USE_PHASE_OFFSET_INSTRUCTION == False
#load the h5 file
with h5py.File(filename) as FID:
for label, pulse in pulses.items():
#create a new waveform
if pulse.isTimeAmp:
shape = np.repeat(pulse.amp * np.exp(1j * pulse.phase), 4)
else:
shape = pulse.amp * np.exp(1j * pulse.phase) * pulse.shape
try:
length = offsets[label][1]
except KeyError:
print("\t{} not found in offsets so skipping".format(pulse))
continue
for offset in offsets[label][0]:
print("\tUpdating {} at offset {}".format(pulse, offset))
FID['/chan_1/waveforms'][offset:offset + length] = np.int16(
MAX_WAVEFORM_VALUE * shape.real)
FID['/chan_2/waveforms'][offset:offset + length] = np.int16(
MAX_WAVEFORM_VALUE * shape.imag)
def write_field(FID, fieldName, data, dataType):
typeSizes = {'int16': 2, 'int32': 4, 'double': 8, 'uint128': 16}
formatChars = {'int16': '<h', 'int32': '<i', 'double': '<d'}
if dataType == 'char':
dataSize = len(data) + 1
data = data + chr(0)
else:
dataSize = typeSizes[dataType]
FID.write(struct.pack('<II', len(fieldName) + 1, dataSize))
FID.write(fieldName + chr(0))
if dataType == 'char':
FID.write(data)
elif dataType == 'uint128':
#struct doesn't support uint128 so write two 64bits
#there are smarter ways but we really only need this for the fake timestamp
FID.write(struct.pack('<QQ', 0, data))
else:
FID.write(struct.pack(formatChars[dataType], data))
def write_waveform(FID, WFname, WFnumber, data):
'''
Helper function to write a waveform
'''
numString = str(WFnumber)
write_field(FID, 'WAVEFORM_NAME_' + numString, WFname, 'char')
#Set integer format
write_field(FID, 'WAVEFORM_TYPE_' + numString, 1, 'int16')
write_field(FID, 'WAVEFORM_LENGTH_' + numString, data.size, 'int32')
write_field(FID, 'WAVEFORM_TIMESTAMP_' + numString, 0, 'uint128')
tmpString = 'WAVEFORM_DATA_' + numString + chr(0)
dataSize = 2 * data.size
FID.write(struct.pack('<II', len(tmpString), dataSize))
FID.write(tmpString)
FID.write(data.tostring())
def read_dicom(self, dcm):
""" Imports CT-images from Dicom object.
:param Dicom dcm: a Dicom object
"""
if "images" not in dcm:
raise InputError("Data doesn't contain ct data")
if not self.header_set:
self._set_header_from_dicom(dcm)
self.cube = np.zeros((self.dimz, self.dimy, self.dimx), dtype=np.int16)
intersect = float(dcm["images"][0].RescaleIntercept)
slope = float(dcm["images"][0].RescaleSlope)
for i in range(len(dcm["images"])):
data = np.array(dcm["images"][i].pixel_array) * slope + intersect
self.cube[i][:][:] = data
if self.slice_pos[1] < self.slice_pos[0]:
self.slice_pos.reverse()
self.zoffset = self.slice_pos[0]
self.cube = self.cube[::-1]
def set_data_type(self, type):
""" Sets the data type for the TRiP98 header files.
:param numpy.type type: numpy type, e.g. np.uint16
"""
if type is np.int8 or type is np.uint8:
self.data_type = "integer"
self.num_bytes = 1
elif type is np.int16 or type is np.uint16:
self.data_type = "integer"
self.num_bytes = 2
elif type is np.int32 or type is np.uint32:
self.data_type = "integer"
self.num_bytes = 4
elif type is np.float:
self.data_type = "float"
self.num_bytes = 4
elif type is np.double:
self.data_type = "double"
self.num_bytes = 8
# ###################### WRITING DICOM FILES #######################################
def load_data(predictions_file, labels_file):
'''
Loads prediction and label data into numpy arrays
Parameters
----------
predictions_file: str
Path to the prediction file
labels_file: str
Path to the label file
Returns
-------
ret_val: tuple
labels array, predictions array
'''
labels = io.load_nparray_from_bin_file(labels_file, np.uint8)
predictions = io.load_nparray_from_bin_file(predictions_file, np.int16)
return labels, predictions
def render_fonts_image(x, path, img_per_row, unit_scale=True):
if unit_scale:
# scale 0-1 matrix back to gray scale bitmaps
bitmaps = (x * 255.).astype(dtype=np.int16) % 256
else:
bitmaps = x
num_imgs, h, w = x.shape
width = img_per_row * w
height = int(np.ceil(float(num_imgs) / img_per_row)) * h
canvas = np.zeros(shape=(height, width), dtype=np.int16)
# make the canvas all white
canvas.fill(0)
for idx, bm in enumerate(bitmaps):
x = h * int(idx / img_per_row)
y = w * int(idx % img_per_row)
canvas[x: x + h, y: y + w] = bm
scipy.misc.toimage(canvas).save(path)
return path
def _typename(t):
if t == np.float16:
return 'float16'
elif t == np.float32:
return 'float32'
elif t == np.float64:
return 'float64'
elif t == np.uint8:
return 'uint8'
elif t == np.uint16:
return 'uint16'
elif t == np.int16:
return 'int16'
elif t == np.int32:
return 'int32'
elif t == np.int64:
return 'int64'
else:
raise TypeError('unknown type')
def default(self, obj):
# convert dates and numpy objects in a json serializable format
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%dT%H:%M:%SZ')
elif isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
elif type(obj) in (np.int_, np.intc, np.intp, np.int8, np.int16,
np.int32, np.int64, np.uint8, np.uint16,
np.uint32, np.uint64):
return int(obj)
elif type(obj) in (np.bool_,):
return bool(obj)
elif type(obj) in (np.float_, np.float16, np.float32, np.float64,
np.complex_, np.complex64, np.complex128):
return float(obj)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def convert_dtype(dtype):
if dtype == np.float32:
return dt.DT_FLOAT
elif dtype == np.float64:
return dt.DT_DOUBLE
elif dtype == np.int32:
return dt.DT_INT32
elif dtype == np.uint8:
return dt.DT_UINT8
elif dtype == np.int16:
return dt.DT_INT16
elif dtype == np.int8:
return dt.DT_INT8
elif dtype == np.dtype('S1'):
return dt.DT_STRING
else:
raise ValueError('Unsupported type.')
def get_audio_from_model(model, sr, duration, seed_audio):
print 'Generating audio...'
new_audio = np.zeros((sr * duration))
curr_sample_idx = 0
while curr_sample_idx < new_audio.shape[0]:
distribution = np.array(model.predict(seed_audio.reshape(1,
frame_size, 1)
), dtype=float).reshape(256)
distribution /= distribution.sum().astype(float)
predicted_val = np.random.choice(range(256), p=distribution)
ampl_val_8 = ((((predicted_val) / 255.0) - 0.5) * 2.0)
ampl_val_16 = (np.sign(ampl_val_8) * (1/256.0) * ((1 + 256.0)**abs(
ampl_val_8) - 1)) * 2**15
new_audio[curr_sample_idx] = ampl_val_16
seed_audio[-1] = ampl_val_16
seed_audio[:-1] = seed_audio[1:]
pc_str = str(round(100*curr_sample_idx/float(new_audio.shape[0]), 2))
sys.stdout.write('Percent complete: ' + pc_str + '\r')
sys.stdout.flush()
curr_sample_idx += 1
print 'Audio generated.'
return new_audio.astype(np.int16)
def to_volume(slices):
"""Creates ndarray volume in Hounsfield units (HU) from array of pydicom slices.
"""
volume = np.stack([s.pixel_array for s in slices])
volume = volume.astype(np.int16)
# Set outside-of-scan pixels to 0
# The intercept is usually -1024, so air is approximately 0
volume[volume == -2000] = 0
# Convert to Hounsfield units (HU)
for n in range(len(slices)):
intercept = slices[n].RescaleIntercept
slope = slices[n].RescaleSlope
if slope != 1:
volume[n] = slope * volume[n].astype(np.float64)
volume[n] = volume[n].astype(np.int16)
volume[n] += np.int16(intercept)
volume = np.array(volume, dtype=np.int16)
spacing = tuple(map(float, ([slices[0].SliceThickness] + slices[0].PixelSpacing)))
return volume, spacing
def to_volume(slices):
"""Creates ndarray volume in Hounsfield units (HU) from array of pydicom slices.
"""
volume = np.stack([s.pixel_array for s in slices])
volume = volume.astype(np.int16)
# Set outside-of-scan pixels to 0
# The intercept is usually -1024, so air is approximately 0
volume[volume == -2000] = 0
# Convert to Hounsfield units (HU)
for n in range(len(slices)):
intercept = slices[n].RescaleIntercept
slope = slices[n].RescaleSlope
if slope != 1:
volume[n] = slope * volume[n].astype(np.float64)
volume[n] = volume[n].astype(np.int16)
volume[n] += np.int16(intercept)
volume = np.array(volume, dtype=np.int16)
spacing = tuple(map(float, ([slices[0].SliceThickness] + slices[0].PixelSpacing)))
return volume, spacing
def test_cell_indices_in_tile(self):
"""
Test get_cell_indices_in_tile by filling an int array for a tile,
using the indices returned by cell_indices_in_tile for each cell
in the tile. The array should be fully filled with 1 at the end
"""
h, v = (20, 11)
grid = MODISGrid()
tile_data = np.zeros(
(MODISGrid.MODIS_tile_height, MODISGrid.MODIS_tile_width),
dtype=np.int16)
cells = grid.get_cells_for_tile(h, v)
for cell in cells:
i_range, j_range = grid.get_cell_indices_in_tile(cell, h, v)
tile_data[i_range[0]:i_range[1], j_range[0]:j_range[1]] += 1
# If tile_data contains some zeros, this means the tile is not
# fully covered by the cells. If it contains values > 1, this means
# than more than one cell covers a given tile pixel
assert_array_equal(tile_data, np.ones_like(tile_data))