def _write_digital_u_16(
task_handle, write_array, num_samps_per_chan, auto_start, timeout,
data_layout=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_written = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxWriteDigitalU16
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, c_bool32,
ctypes.c_double, ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint16, flags=('C', 'W')),
ctypes.POINTER(ctypes.c_int), ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, auto_start, timeout,
data_layout.value, write_array,
ctypes.byref(samps_per_chan_written), None)
check_for_error(error_code)
return samps_per_chan_written.value
python类uint16()的实例源码
def _write_binary_u_16(
task_handle, write_array, num_samps_per_chan, auto_start, timeout,
data_layout=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_written = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxWriteBinaryU16
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, c_bool32,
ctypes.c_double, ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint16, flags=('C', 'W')),
ctypes.POINTER(ctypes.c_int), ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, auto_start, timeout,
data_layout.value, write_array,
ctypes.byref(samps_per_chan_written), None)
check_for_error(error_code)
return samps_per_chan_written.value
def _read_binary_u_16(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadBinaryU16
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint16, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _read_digital_u_16(
task_handle, read_array, num_samps_per_chan, timeout,
fill_mode=FillMode.GROUP_BY_CHANNEL):
samps_per_chan_read = ctypes.c_int()
cfunc = lib_importer.windll.DAQmxReadDigitalU16
if cfunc.argtypes is None:
with cfunc.arglock:
if cfunc.argtypes is None:
cfunc.argtypes = [
lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
ctypes.c_int,
wrapped_ndpointer(dtype=numpy.uint16, flags=('C', 'W')),
ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(c_bool32)]
error_code = cfunc(
task_handle, num_samps_per_chan, timeout, fill_mode.value,
read_array, numpy.prod(read_array.shape),
ctypes.byref(samps_per_chan_read), None)
check_for_error(error_code)
return samps_per_chan_read.value
def _get_dtype_maps():
""" Get dictionaries to map numpy data types to ITK types and the
other way around.
"""
# Define pairs
tmp = [ (np.float32, 'MET_FLOAT'), (np.float64, 'MET_DOUBLE'),
(np.uint8, 'MET_UCHAR'), (np.int8, 'MET_CHAR'),
(np.uint16, 'MET_USHORT'), (np.int16, 'MET_SHORT'),
(np.uint32, 'MET_UINT'), (np.int32, 'MET_INT'),
(np.uint64, 'MET_ULONG'), (np.int64, 'MET_LONG') ]
# Create dictionaries
map1, map2 = {}, {}
for np_type, itk_type in tmp:
map1[np_type.__name__] = itk_type
map2[itk_type] = np_type.__name__
# Done
return map1, map2
def unpack_packet(stream,vertex_type,size):
# The entire packet is read into memory at once for speed
packet = stream.read(size)
primitives = []
i = 0
while i < size:
opcode = packet[i]
if opcode == 0x00:
i += 1
continue
primitive_type = gx.PrimitiveType(opcode)
vertex_count = uint16.unpack_from(packet,i + 1)
vertices = numpy.frombuffer(packet,vertex_type,vertex_count,i + 3)
primitives.append(Primitive(primitive_type,vertices))
i += 3 + vertex_count*vertex_type.itemsize
return primitives
def iteritems(self, every_k_frames=1):
for rgb_im, depth_im, mask_im, loc in \
izip(self.rgb.iteritems(every_k_frames=every_k_frames),
self.depth.iteritems(every_k_frames=every_k_frames),
self.mask.iteritems(every_k_frames=every_k_frames),
self.locations[::every_k_frames]):
rgb = np.zeros(shape=UWRGBDObjectDataset.default_rgb_shape, dtype=np.uint8)
depth = np.zeros(shape=UWRGBDObjectDataset.default_depth_shape, dtype=np.uint16)
mask = np.zeros(shape=UWRGBDObjectDataset.default_depth_shape, dtype=np.uint8)
rgb[loc[1]:loc[1]+rgb_im.shape[0], loc[0]:loc[0]+rgb_im.shape[1]] = rgb_im
depth[loc[1]:loc[1]+depth_im.shape[0], loc[0]:loc[0]+depth_im.shape[1]] = depth_im
mask[loc[1]:loc[1]+mask_im.shape[0], loc[0]:loc[0]+mask_im.shape[1]] = mask_im
# Only a single bbox per image
yield AttrDict(img=rgb, depth=depth, mask=mask,
bbox=[AttrDict(
coords=np.float32([loc[0], loc[1],
loc[0]+mask_im.shape[1],
loc[1]+mask_im.shape[0]]),
target=self.target,
category=UWRGBDDataset.get_category_name(self.target),
instance=self.instance)])
def test_rescaleData():
dtypes = map(np.dtype, ('ubyte', 'uint16', 'byte', 'int16', 'int', 'float'))
for dtype1 in dtypes:
for dtype2 in dtypes:
data = (np.random.random(size=10) * 2**32 - 2**31).astype(dtype1)
for scale, offset in [(10, 0), (10., 0.), (1, -50), (0.2, 0.5), (0.001, 0)]:
if dtype2.kind in 'iu':
lim = np.iinfo(dtype2)
lim = lim.min, lim.max
else:
lim = (-np.inf, np.inf)
s1 = np.clip(float(scale) * (data-float(offset)), *lim).astype(dtype2)
s2 = pg.rescaleData(data, scale, offset, dtype2)
assert s1.dtype == s2.dtype
if dtype2.kind in 'iu':
assert np.all(s1 == s2)
else:
assert np.allclose(s1, s2)
def test_rescaleData():
dtypes = map(np.dtype, ('ubyte', 'uint16', 'byte', 'int16', 'int', 'float'))
for dtype1 in dtypes:
for dtype2 in dtypes:
data = (np.random.random(size=10) * 2**32 - 2**31).astype(dtype1)
for scale, offset in [(10, 0), (10., 0.), (1, -50), (0.2, 0.5), (0.001, 0)]:
if dtype2.kind in 'iu':
lim = np.iinfo(dtype2)
lim = lim.min, lim.max
else:
lim = (-np.inf, np.inf)
s1 = np.clip(float(scale) * (data-float(offset)), *lim).astype(dtype2)
s2 = pg.rescaleData(data, scale, offset, dtype2)
assert s1.dtype == s2.dtype
if dtype2.kind in 'iu':
assert np.all(s1 == s2)
else:
assert np.allclose(s1, s2)
def __init__(self, *args, **kwds):
import numpy
self.dst_types = [numpy.uint8, numpy.uint16, numpy.uint32]
try:
self.dst_types.append(numpy.uint64)
except AttributeError:
pass
pygame.display.init()
try:
unittest.TestCase.__init__(self, *args, **kwds)
self.sources = [self._make_src_surface(8),
self._make_src_surface(16),
self._make_src_surface(16, srcalpha=True),
self._make_src_surface(24),
self._make_src_surface(32),
self._make_src_surface(32, srcalpha=True)]
finally:
pygame.display.quit()
def array2d(surface):
"""pygame.numpyarray.array2d(Surface): return array
copy pixels into a 2d array
Copy the pixels from a Surface into a 2D array. The bit depth of the
surface will control the size of the integer values, and will work
for any type of pixel format.
This function will temporarily lock the Surface as pixels are copied
(see the Surface.lock - lock the Surface memory for pixel access
method).
"""
bpp = surface.get_bytesize()
try:
dtype = (numpy.uint8, numpy.uint16, numpy.int32, numpy.int32)[bpp - 1]
except IndexError:
raise ValueError("unsupported bit depth %i for 2D array" % (bpp * 8,))
size = surface.get_size()
array = numpy.empty(size, dtype)
surface_to_array(array, surface)
return array
def create_empty_copy(self, source_filename, destination_filename):
"""
Create an empty copy of a COAL classified image with the same size.
Args:
source_filename (str): filename of the source image
destination_filename (str): filename of the destination image
"""
logging.info("Creating an empty copy of classified image '%s' with the same size. Saving to '%s'" %(source_filename, destination_filename))
# open the source image
source = spectral.open_image(source_filename)
# create an empty array of the same dimensions
destination = numpy.zeros(shape=source.shape, dtype=numpy.uint16)
# save it with source metadata
spectral.io.envi.save_classification(
destination_filename,
destination,
class_names=['No data','Data'],
metadata={
'data ignore value': 0,
'map info': source.metadata.get('map info')
})
def _check_valid_data(self, data):
"""Checks that the incoming data is a 2 x #elements ndarray of ints.
Parameters
----------
data : :obj:`numpy.ndarray`
The data to verify.
Raises
------
ValueError
If the data is not of the correct shape or type.
"""
if data.dtype.type != np.int8 and data.dtype.type != np.int16 \
and data.dtype.type != np.int32 and data.dtype.type != np.int64 \
and data.dtype.type != np.uint8 and data.dtype.type != np.uint16 \
and data.dtype.type != np.uint32 and data.dtype.type != np.uint64:
raise ValueError('Must initialize image coords with a numpy int ndarray')
if data.shape[0] != 2:
raise ValueError('Illegal data array passed to image coords. Must have 2 coordinates')
if len(data.shape) > 2:
raise ValueError('Illegal data array passed to point cloud. Must have 1 or 2 dimensions')
def to_best_type(array):
'''Convert array to lowest possible bitrate.
'''
ui8 = np.iinfo(np.uint8)
ui8 = ui8.max
ui16 = np.iinfo(np.uint16)
ui16 = ui16.max
ui32 = np.iinfo(np.uint32)
ui32 = ui32.max
ui64 = np.iinfo(np.uint64)
ui64 = ui64.max
if array.max() <= ui64:
new_type = np.uint64
if array.max() <= ui32:
new_type = np.uint32
if array.max() <= ui16:
new_type = np.uint16
if array.max() <= ui8:
new_type = np.uint8
return array.astype(new_type)
def _gene_embed_space(self,vec):
shape = vec.shape
vec = vec.flatten()
combo_neg_idx = np.array([1 if vec[i]<0 else 0 for i in range(len(vec))])
vec_pos = np.abs(vec)
int_part = np.floor(vec_pos)
frac_part = np.round(vec_pos - int_part,2)
bi_int_part=[] #?????????????signature???????
for i in range(len(int_part)):
bi=list(bin(int(int_part[i]))[2:])
bie = [0] * (16 - len(bi))
bie.extend(bi)
bi_int_part.append(np.array(bie,dtype=np.uint16))
bi_int_part = np.array(bi_int_part)
sig = []
for i in range(len(bi_int_part)):
sig.append(bi_int_part[i][10])
sig = np.array(sig).reshape(shape)
return np.array(bi_int_part),frac_part.reshape(shape),combo_neg_idx.reshape(shape),sig
def _gene_embed_space(self,vec):
shape = vec.shape
vec = vec.flatten()
combo_neg_idx = np.array([1 if vec[i]<0 else 0 for i in range(len(vec))])
vec_pos = np.abs(vec)
int_part = np.floor(vec_pos)
frac_part = np.round(vec_pos - int_part,2)
bi_int_part=[] #?????????????signature???????
for i in range(len(int_part)):
bi=list(bin(int(int_part[i]))[2:])
bie = [0] * (16 - len(bi))
bie.extend(bi)
bi_int_part.append(np.array(bie,dtype=np.uint16))
bi_int_part = np.array(bi_int_part)
sig = []
for i in range(len(bi_int_part)):
sig.append(bi_int_part[i][10])
sig = np.array(sig).reshape(shape)
return np.array(bi_int_part),frac_part.reshape(shape),combo_neg_idx.reshape(shape),sig
def test_int(self):
for st, ut, s in [(np.int8, np.uint8, 8),
(np.int16, np.uint16, 16),
(np.int32, np.uint32, 32),
(np.int64, np.uint64, 64)]:
for i in range(1, s):
assert_equal(hash(st(-2**i)), hash(-2**i),
err_msg="%r: -2**%d" % (st, i))
assert_equal(hash(st(2**(i - 1))), hash(2**(i - 1)),
err_msg="%r: 2**%d" % (st, i - 1))
assert_equal(hash(st(2**i - 1)), hash(2**i - 1),
err_msg="%r: 2**%d - 1" % (st, i))
i = max(i - 1, 1)
assert_equal(hash(ut(2**(i - 1))), hash(2**(i - 1)),
err_msg="%r: 2**%d" % (ut, i - 1))
assert_equal(hash(ut(2**i - 1)), hash(2**i - 1),
err_msg="%r: 2**%d - 1" % (ut, i))
def setUp(self):
# An array of all possible float16 values
self.all_f16 = np.arange(0x10000, dtype=uint16)
self.all_f16.dtype = float16
self.all_f32 = np.array(self.all_f16, dtype=float32)
self.all_f64 = np.array(self.all_f16, dtype=float64)
# An array of all non-NaN float16 values, in sorted order
self.nonan_f16 = np.concatenate(
(np.arange(0xfc00, 0x7fff, -1, dtype=uint16),
np.arange(0x0000, 0x7c01, 1, dtype=uint16)))
self.nonan_f16.dtype = float16
self.nonan_f32 = np.array(self.nonan_f16, dtype=float32)
self.nonan_f64 = np.array(self.nonan_f16, dtype=float64)
# An array of all finite float16 values, in sorted order
self.finite_f16 = self.nonan_f16[1:-1]
self.finite_f32 = self.nonan_f32[1:-1]
self.finite_f64 = self.nonan_f64[1:-1]
def test_half_values(self):
"""Confirms a small number of known half values"""
a = np.array([1.0, -1.0,
2.0, -2.0,
0.0999755859375, 0.333251953125, # 1/10, 1/3
65504, -65504, # Maximum magnitude
2.0**(-14), -2.0**(-14), # Minimum normal
2.0**(-24), -2.0**(-24), # Minimum subnormal
0, -1/1e1000, # Signed zeros
np.inf, -np.inf])
b = np.array([0x3c00, 0xbc00,
0x4000, 0xc000,
0x2e66, 0x3555,
0x7bff, 0xfbff,
0x0400, 0x8400,
0x0001, 0x8001,
0x0000, 0x8000,
0x7c00, 0xfc00], dtype=uint16)
b.dtype = float16
assert_equal(a, b)
def test_spacing_nextafter(self):
"""Test np.spacing and np.nextafter"""
# All non-negative finite #'s
a = np.arange(0x7c00, dtype=uint16)
hinf = np.array((np.inf,), dtype=float16)
a_f16 = a.view(dtype=float16)
assert_equal(np.spacing(a_f16[:-1]), a_f16[1:]-a_f16[:-1])
assert_equal(np.nextafter(a_f16[:-1], hinf), a_f16[1:])
assert_equal(np.nextafter(a_f16[0], -hinf), -a_f16[1])
assert_equal(np.nextafter(a_f16[1:], -hinf), a_f16[:-1])
# switch to negatives
a |= 0x8000
assert_equal(np.spacing(a_f16[0]), np.spacing(a_f16[1]))
assert_equal(np.spacing(a_f16[1:]), a_f16[:-1]-a_f16[1:])
assert_equal(np.nextafter(a_f16[0], hinf), -a_f16[1])
assert_equal(np.nextafter(a_f16[1:], hinf), a_f16[:-1])
assert_equal(np.nextafter(a_f16[:-1], -hinf), a_f16[1:])
def test_basic(self):
ba = [1, 2, 10, 11, 6, 5, 4]
ba2 = [[1, 2, 3, 4], [5, 6, 7, 9], [10, 3, 4, 5]]
for ctype in [np.int8, np.uint8, np.int16, np.uint16, np.int32,
np.uint32, np.float32, np.float64, np.complex64, np.complex128]:
a = np.array(ba, ctype)
a2 = np.array(ba2, ctype)
tgt = np.array([1, 3, 13, 24, 30, 35, 39], ctype)
assert_array_equal(np.cumsum(a, axis=0), tgt)
tgt = np.array(
[[1, 2, 3, 4], [6, 8, 10, 13], [16, 11, 14, 18]], ctype)
assert_array_equal(np.cumsum(a2, axis=0), tgt)
tgt = np.array(
[[1, 3, 6, 10], [5, 11, 18, 27], [10, 13, 17, 22]], ctype)
assert_array_equal(np.cumsum(a2, axis=1), tgt)
def test_basic(self):
ba = [1, 2, 10, 11, 6, 5, 4]
ba2 = [[1, 2, 3, 4], [5, 6, 7, 9], [10, 3, 4, 5]]
for ctype in [np.int16, np.uint16, np.int32, np.uint32,
np.float32, np.float64, np.complex64, np.complex128]:
a = np.array(ba, ctype)
a2 = np.array(ba2, ctype)
if ctype in ['1', 'b']:
self.assertRaises(ArithmeticError, np.prod, a)
self.assertRaises(ArithmeticError, np.prod, a2, 1)
else:
assert_equal(a.prod(axis=0), 26400)
assert_array_equal(a2.prod(axis=0),
np.array([50, 36, 84, 180], ctype))
assert_array_equal(a2.prod(axis=-1),
np.array([24, 1890, 600], ctype))
def test_basic(self):
ba = [1, 2, 10, 11, 6, 5, 4]
ba2 = [[1, 2, 3, 4], [5, 6, 7, 9], [10, 3, 4, 5]]
for ctype in [np.int16, np.uint16, np.int32, np.uint32,
np.float32, np.float64, np.complex64, np.complex128]:
a = np.array(ba, ctype)
a2 = np.array(ba2, ctype)
if ctype in ['1', 'b']:
self.assertRaises(ArithmeticError, np.cumprod, a)
self.assertRaises(ArithmeticError, np.cumprod, a2, 1)
self.assertRaises(ArithmeticError, np.cumprod, a)
else:
assert_array_equal(np.cumprod(a, axis=-1),
np.array([1, 2, 20, 220,
1320, 6600, 26400], ctype))
assert_array_equal(np.cumprod(a2, axis=0),
np.array([[1, 2, 3, 4],
[5, 12, 21, 36],
[50, 36, 84, 180]], ctype))
assert_array_equal(np.cumprod(a2, axis=-1),
np.array([[1, 2, 6, 24],
[5, 30, 210, 1890],
[10, 30, 120, 600]], ctype))
def watershed_cut(depthImage, ssMask):
ssMask = ssMask.astype(np.int32)
resultImage = np.zeros(shape=ssMask.shape, dtype=np.float32)
for semClass in CLASS_TO_CITYSCAPES.keys():
csCode = CLASS_TO_CITYSCAPES[semClass]
ssCode = CLASS_TO_SS[semClass]
ssMaskClass = (ssMask == ssCode)
ccImage = (depthImage > THRESHOLD[semClass]) * ssMaskClass
ccImage = skimage.morphology.remove_small_objects(ccImage, min_size=MIN_SIZE[semClass])
ccImage = skimage.morphology.remove_small_holes(ccImage)
ccLabels = skimage.morphology.label(ccImage)
ccIDs = np.unique(ccLabels)[1:]
for ccID in ccIDs:
ccIDMask = (ccLabels == ccID)
ccIDMask = skimage.morphology.binary_dilation(ccIDMask, SELEM[THRESHOLD[semClass]])
instanceID = 1000 * csCode + ccID
resultImage[ccIDMask] = instanceID
resultImage = resultImage.astype(np.uint16)
return resultImage
def test_process_image(compress, out_dir):
numpy.random.seed(8)
image = numpy.random.randint(256, size=(16, 16, 3), dtype=numpy.uint16)
meta = {
"DNA": "/User/jcaciedo/LUAD/dna.tiff",
"ER": "/User/jcaciedo/LUAD/er.tiff",
"Mito": "/User/jcaciedo/LUAD/mito.tiff"
}
compress.stats["illum_correction_function"] = numpy.ones((16,16,3))
compress.stats["upper_percentiles"] = [255, 255, 255]
compress.stats["lower_percentiles"] = [0, 0, 0]
compress.process_image(0, image, meta)
filenames = glob.glob(os.path.join(out_dir,"*"))
real_filenames = [os.path.join(out_dir, x) for x in ["dna.png", "er.png", "mito.png"]]
filenames.sort()
assert real_filenames == filenames
for i in range(3):
data = scipy.misc.imread(filenames[i])
numpy.testing.assert_array_equal(image[:,:,i], data)
def test_apply(corrector):
image = numpy.random.randint(256, size=(24, 24, 3), dtype=numpy.uint16)
illum_corr_func = numpy.random.rand(24, 24, 3)
illum_corr_func /= illum_corr_func.min()
corrector.illum_corr_func = illum_corr_func
corrected = corrector.apply(image)
expected = image / illum_corr_func
assert corrected.shape == (24, 24, 3)
numpy.testing.assert_array_equal(corrected, expected)
def interpret_header(self):
"""redefine variables from header dictionary"""
self.nifs = self.header['nifs']
self.nchans = self.header['nchans']
self.nbits = self.header['nbits']
signed = 'signed' in self.header and self.header['signed'] is True
if self.nbits >= 8:
if signed:
self.dtype = {8: np.int8,
16: np.int16,
32: np.float32,
64: np.float64}[self.nbits]
else:
self.dtype = {8: np.uint8,
16: np.uint16,
32: np.float32,
64: np.float64}[self.nbits]
else:
self.dtype = np.int8 if signed else np.uint8
def numpy2bifrost(dtype):
if dtype == np.int8: return _bf.BF_DTYPE_I8
elif dtype == np.int16: return _bf.BF_DTYPE_I16
elif dtype == np.int32: return _bf.BF_DTYPE_I32
elif dtype == np.uint8: return _bf.BF_DTYPE_U8
elif dtype == np.uint16: return _bf.BF_DTYPE_U16
elif dtype == np.uint32: return _bf.BF_DTYPE_U32
elif dtype == np.float16: return _bf.BF_DTYPE_F16
elif dtype == np.float32: return _bf.BF_DTYPE_F32
elif dtype == np.float64: return _bf.BF_DTYPE_F64
elif dtype == np.float128: return _bf.BF_DTYPE_F128
elif dtype == ci8: return _bf.BF_DTYPE_CI8
elif dtype == ci16: return _bf.BF_DTYPE_CI16
elif dtype == ci32: return _bf.BF_DTYPE_CI32
elif dtype == cf16: return _bf.BF_DTYPE_CF16
elif dtype == np.complex64: return _bf.BF_DTYPE_CF32
elif dtype == np.complex128: return _bf.BF_DTYPE_CF64
elif dtype == np.complex256: return _bf.BF_DTYPE_CF128
else: raise ValueError("Unsupported dtype: " + str(dtype))
def numpy2string(dtype):
if dtype == np.int8: return 'i8'
elif dtype == np.int16: return 'i16'
elif dtype == np.int32: return 'i32'
elif dtype == np.int64: return 'i64'
elif dtype == np.uint8: return 'u8'
elif dtype == np.uint16: return 'u16'
elif dtype == np.uint32: return 'u32'
elif dtype == np.uint64: return 'u64'
elif dtype == np.float16: return 'f16'
elif dtype == np.float32: return 'f32'
elif dtype == np.float64: return 'f64'
elif dtype == np.float128: return 'f128'
elif dtype == np.complex64: return 'cf32'
elif dtype == np.complex128: return 'cf64'
elif dtype == np.complex256: return 'cf128'
else: raise TypeError("Unsupported dtype: " + str(dtype))
def on_data(self, ispan):
"""Process data from from ispans to ospans and return the number of
frames to commit for each output (or None to commit complete spans)."""
data = ispan.data
print "PgmWriterBlock.on_data()"
# HACK TESTING
if data.dtype != np.uint8:
data = (data - data.min()) / (data.max() - data.min()) * 255
#data = np.clip(data, 0, 255)
data = data.astype(np.uint8)
#data = data.astype(np.uint16)
if self.outfile is None:
return
data.tofile(self.outfile)
# HACK TESTING only write the first gulp
self.outfile.close()
self.outfile = None