def test_dtype_promotion(self):
# datetime <op> datetime computes the metadata gcd
# timedelta <op> timedelta computes the metadata gcd
for mM in ['m', 'M']:
assert_equal(
np.promote_types(np.dtype(mM+'8[2Y]'), np.dtype(mM+'8[2Y]')),
np.dtype(mM+'8[2Y]'))
assert_equal(
np.promote_types(np.dtype(mM+'8[12Y]'), np.dtype(mM+'8[15Y]')),
np.dtype(mM+'8[3Y]'))
assert_equal(
np.promote_types(np.dtype(mM+'8[62M]'), np.dtype(mM+'8[24M]')),
np.dtype(mM+'8[2M]'))
assert_equal(
np.promote_types(np.dtype(mM+'8[1W]'), np.dtype(mM+'8[2D]')),
np.dtype(mM+'8[1D]'))
assert_equal(
np.promote_types(np.dtype(mM+'8[W]'), np.dtype(mM+'8[13s]')),
np.dtype(mM+'8[s]'))
assert_equal(
np.promote_types(np.dtype(mM+'8[13W]'), np.dtype(mM+'8[49s]')),
np.dtype(mM+'8[7s]'))
# timedelta <op> timedelta raises when there is no reasonable gcd
assert_raises(TypeError, np.promote_types,
np.dtype('m8[Y]'), np.dtype('m8[D]'))
assert_raises(TypeError, np.promote_types,
np.dtype('m8[M]'), np.dtype('m8[W]'))
# timedelta <op> timedelta may overflow with big unit ranges
assert_raises(OverflowError, np.promote_types,
np.dtype('m8[W]'), np.dtype('m8[fs]'))
assert_raises(OverflowError, np.promote_types,
np.dtype('m8[s]'), np.dtype('m8[as]'))
python类promote_types()的实例源码
def print_coercion_table(ntypes, inputfirstvalue, inputsecondvalue, firstarray, use_promote_types=False):
print('+', end=' ')
for char in ntypes:
print(char, end=' ')
print()
for row in ntypes:
if row == 'O':
rowtype = GenericObject
else:
rowtype = np.obj2sctype(row)
print(row, end=' ')
for col in ntypes:
if col == 'O':
coltype = GenericObject
else:
coltype = np.obj2sctype(col)
try:
if firstarray:
rowvalue = np.array([rowtype(inputfirstvalue)], dtype=rowtype)
else:
rowvalue = rowtype(inputfirstvalue)
colvalue = coltype(inputsecondvalue)
if use_promote_types:
char = np.promote_types(rowvalue.dtype, colvalue.dtype).char
else:
value = np.add(rowvalue, colvalue)
if isinstance(value, np.ndarray):
char = value.dtype.char
else:
char = np.dtype(type(value)).char
except ValueError:
char = '!'
except OverflowError:
char = '@'
except TypeError:
char = '#'
print(char, end=' ')
print()
def combine_data_frame_files(output_filename, input_filenames):
in_files = [ h5py.File(f, 'r') for f in input_filenames ]
column_names = [ tuple(sorted(f.attrs.get("column_names"))) for f in in_files ]
uniq = set(column_names)
if len(uniq) > 1:
raise Exception("you're attempting to combine incompatible data frames")
if len(uniq) == 0:
r = "No input files? output: %s, inputs: %s" % (output_filename, str(input_filenames))
raise Exception(r)
column_names = uniq.pop()
if os.path.exists(output_filename):
os.remove(output_filename)
out = h5py.File(output_filename)
out.attrs.create("column_names", column_names)
# Write successive columns
for c in column_names:
datasets = [f[c] for f in in_files if len(f[c]) > 0]
num_w_levels = np.sum([has_levels(ds) for ds in datasets if len(ds) > 0])
fract_w_levels = float(num_w_levels) / (len(datasets) + 1)
if fract_w_levels > 0.25:
combine_level_column(out, datasets, c)
continue
# filter out empty rows from the type promotion, unless they're all empty
types = [get_col_type(ds) for ds in datasets if len(ds) > 0]
if len(types) == 0:
# Fall back to getting column types from empty data frames
types = [get_col_type(f[c]) for f in in_files]
common_type = reduce(np.promote_types, types)
# numpy doesn't understand vlen strings -- so always promote to vlen strings if anything is using them
if vlen_string in types:
common_type = vlen_string
out_ds = out.create_dataset(c, shape=(0,), maxshape=(None,), dtype=common_type, compression=COMPRESSION, shuffle=True, chunks=(CHUNK_SIZE,))
item_count = 0
for ds in datasets:
new_items = ds.shape[0]
out_ds.resize((item_count + new_items,))
data = ds[:]
if has_levels(ds):
levels = get_levels(ds)
data = levels[data]
out_ds[item_count:(item_count + new_items)] = data
item_count += new_items
for in_f in in_files:
in_f.close()
out.close()
def ISTA(
fmatA,
arrB,
numLambda=0.1,
numMaxSteps=100
):
'''
Wrapper around the ISTA algrithm to allow processing of arrays of signals
fmatA - input system matrix
arrB - input data vector (measurements)
numLambda - balancing parameter in optimization problem
between data fidelity and sparsity
numMaxSteps - maximum number of steps to run
numL - step size during the conjugate gradient step
'''
if len(arrB.shape) > 2:
raise ValueError("Only n x m arrays are supported for ISTA")
# calculate the largest singular value to get the right step size
numL = 1.0 / (fmatA.largestSV ** 2)
arrX = np.zeros(
(fmatA.numM, arrB.shape[1]),
dtype=np.promote_types(np.float32, arrB.dtype)
)
# start iterating
for numStep in range(numMaxSteps):
# do the gradient step and threshold
arrStep = arrX - numL * fmatA.backward(fmatA.forward(arrX) - arrB)
arrX = _softThreshold(arrStep, numL * numLambda * 0.5)
# return the unthresholded values for all non-zero support elements
return np.where(arrX != 0, arrStep, arrX)
################################################################################
### Maintenance and Documentation
################################################################################
################################################## inspection interface
def FISTA(
fmatA,
arrB,
numLambda=0.1,
numMaxSteps=100
):
'''
Wrapper around the FISTA algrithm to allow processing of arrays of signals
fmatA - input system matrix
arrB - input data vector (measurements)
numLambda - balancing parameter in optimization problem
between data fidelity and sparsity
numMaxSteps - maximum number of steps to run
numL - step size during the conjugate gradient step
'''
if len(arrB.shape) > 2:
raise ValueError("Only n x m arrays are supported for FISTA")
# calculate the largest singular value to get the right step size
numL = 1.0 / (fmatA.largestSV ** 2)
t = 1
arrX = np.zeros(
(fmatA.numM, arrB.shape[1]),
dtype=np.promote_types(np.float32, arrB.dtype)
)
# initial arrY
arrY = np.copy(arrX)
# start iterating
for numStep in range(numMaxSteps):
arrXold = np.copy(arrX)
# do the gradient step and threshold
arrStep = arrY - numL * fmatA.backward(fmatA.forward(arrY) - arrB)
arrX = _softThreshold(arrStep, numL * numLambda * 0.5)
# update t
tOld =t
t = (1 + np.sqrt(1 + 4 * t ** 2)) / 2
# update arrY
arrY = arrX + ((tOld - 1) / t) * (arrX - arrXold)
# return the unthresholded values for all non-zero support elements
return np.where(arrX != 0, arrStep, arrX)
################################################################################
### Maintenance and Documentation
################################################################################
################################################## inspection interface
def _set_abs(self, abs_key, subjac):
"""
Set sub-Jacobian.
Parameters
----------
abs_key : (str, str)
Absolute name pair of sub-Jacobian.
subjac : int or float or ndarray or sparse matrix
sub-Jacobian as a scalar, vector, array, or AIJ list or tuple.
"""
if not issparse(subjac):
# np.promote_types will choose the smallest dtype that can contain both arguments
subjac = np.atleast_1d(subjac)
safe_dtype = np.promote_types(subjac.dtype, float)
subjac = subjac.astype(safe_dtype, copy=False)
# Bail here so that we allow top level jacobians to be of reduced size when indices are
# specified on driver vars.
if self._override_checks:
self._subjacs[abs_key] = subjac
return
if abs_key in self._subjacs_info:
subjac_info = self._subjacs_info[abs_key][0]
rows = subjac_info['rows']
else:
rows = None
if rows is None:
# Dense subjac
shape = self._abs_key2shape(abs_key)
subjac = np.atleast_2d(subjac)
if subjac.shape == (1, 1):
subjac = subjac[0, 0] * np.ones(shape, dtype=safe_dtype)
else:
subjac = subjac.reshape(shape)
if abs_key in self._subjacs and self._subjacs[abs_key].shape == shape:
np.copyto(self._subjacs[abs_key], subjac)
else:
self._subjacs[abs_key] = subjac.copy()
else:
# Sparse subjac
if subjac.shape == (1,):
subjac = subjac[0] * np.ones(rows.shape, dtype=safe_dtype)
if subjac.shape != rows.shape:
raise ValueError("Sub-jacobian for key %s has "
"the wrong shape (%s), expected (%s)." %
(abs_key, subjac.shape, rows.shape))
if abs_key in self._subjacs and subjac.shape == self._subjacs[abs_key][0].shape:
np.copyto(self._subjacs[abs_key][0], subjac)
else:
self._subjacs[abs_key] = [subjac.copy(), rows, subjac_info['cols']]
else:
self._subjacs[abs_key] = subjac