def stft(wav, n_fft=1024, overlap=4, dt=tf.int32, absp=False):
assert (wav.shape[0] > n_fft)
X = tf.placeholder(dtype=dt,shape=wav.shape)
X = tf.cast(X,tf.float32)
hop = n_fft / overlap
## prepare constant variable
Pi = tf.constant(np.pi, dtype=tf.float32)
W = tf.constant(scipy.hanning(n_fft), dtype=tf.float32)
S = tf.pack([tf.fft(tf.cast(tf.multiply(W,X[i:i+n_fft]),\
tf.complex64)) for i in range(1, wav.shape[0] - n_fft, hop)])
abs_S = tf.complex_abs(S)
sess = tf.Session()
if absp:
return sess.run(abs_S, feed_dict={X:wav})
else:
return sess.run(S, feed_dict={X:wav})
python类complex64()的实例源码
def testNesterovMomentum(self):
for dtype in [tf.complex64]:
with self.test_session():
var0 = tf.Variable([1.0, 2.0], dtype=dtype)
var1 = tf.Variable([3.0, 4.0], dtype=dtype)
var0_np = np.array([1.0, 2.0], dtype=dtype.as_numpy_dtype)
var1_np = np.array([3.0, 4.0], dtype=dtype.as_numpy_dtype)
accum0_np = np.array([0.0, 0.0], dtype=dtype.as_numpy_dtype)
accum1_np = np.array([0.0, 0.0], dtype=dtype.as_numpy_dtype)
cost = 5 * var0 * var0 + 3 * var1
global_step = tf.Variable(tf.zeros([], tf.int64), name='global_step')
mom_op = ctf.train.CplxMomentumOptimizer(learning_rate=2.0, momentum=0.9,
use_nesterov=True)
opt_op = mom_op.minimize(cost, global_step, [var0, var1])
tf.global_variables_initializer().run()
for t in range(1, 5):
opt_op.run()
var0_np, accum0_np = self._update_nesterov_momentum_numpy(var0_np,
accum0_np, var0_np * 10, 2.0, 0.9)
var1_np, accum1_np = self._update_nesterov_momentum_numpy(var1_np,
accum1_np, 3, 2.0, 0.9)
self.assertAllClose(var0_np, var0.eval())
self.assertAllClose(var1_np, var1.eval())
def testTwoSessions(self):
optimizer = ctf.train.CplxAdamOptimizer()
g = tf.Graph()
with g.as_default():
with tf.Session():
var0 = tf.Variable(np.array([1.0+1.0j, 2.0+2.0j], dtype=np.complex64),
name="v0")
grads0 = tf.constant(np.array([0.1+0.1j, 0.1+0.1j], dtype=np.complex64))
optimizer.apply_gradients([(grads0, var0)])
gg = tf.Graph()
with gg.as_default():
with tf.Session():
var0 = tf.Variable(np.array([1.0+1.0j, 2.0+2.0j], dtype=np.complex64),
name="v0")
grads0 = tf.constant(np.array([0.1+0.1j, 0.1+0.1j], dtype=np.complex64))
# If the optimizer saves any state not keyed by graph the following line
# fails.
optimizer.apply_gradients([(grads0, var0)])
def get_mu_tensor(self):
const_fact = self._dist_to_opt_avg**2 * self._h_min**2 / 2 / self._grad_var
coef = tf.Variable([-1.0, 3.0, 0.0, 1.0], dtype=tf.float32, name="cubic_solver_coef")
coef = tf.scatter_update(coef, tf.constant(2), -(3 + const_fact) )
roots = tf.py_func(np.roots, [coef], Tout=tf.complex64, stateful=False)
# filter out the correct root
root_idx = tf.logical_and(tf.logical_and(tf.greater(tf.real(roots), tf.constant(0.0) ),
tf.less(tf.real(roots), tf.constant(1.0) ) ), tf.less(tf.abs(tf.imag(roots) ), 1e-5) )
# in case there are two duplicated roots satisfying the above condition
root = tf.reshape(tf.gather(tf.gather(roots, tf.where(root_idx) ), tf.constant(0) ), shape=[] )
tf.assert_equal(tf.size(root), tf.constant(1) )
dr = self._h_max / self._h_min
mu = tf.maximum(tf.real(root)**2, ( (tf.sqrt(dr) - 1)/(tf.sqrt(dr) + 1) )**2)
return mu
def apply_mask(spec, mask):
mag_spec = tf.abs(spec)
phase_spec = get_phase(spec)
return tf.multiply(tf.cast(tf.multiply(mag_spec, mask), tf.complex64), tf.exp(tf.complex(tf.zeros_like(mag_spec), phase_spec)))
def testCplxL2Loss(self):
for dtype in [tf.complex64]:
with self.test_session(force_gpu=True):
x = tf.constant([1.0+1.0j, 0.0-2.0j, 3.0-0.0j, 2.0+1.0j], shape=[2, 2],
name="x", dtype=dtype)
l2loss = ctf.nn.cplx_l2_loss(x)
value = l2loss.eval()
self.assertAllClose(10.0, value)
def _toType(self, dtype):
if dtype == np.complex64:
return tf.complex64
else:
assert False, (dtype)
def testApplyAdam(self):
for dtype, force_gpu in itertools.product(
[np.complex64], [True]):
var = np.arange(100).astype(dtype)
m = np.arange(1, 101).astype(dtype)
v = np.arange(101, 201).astype(dtype)
grad = np.arange(100).astype(dtype)
self._testTypesForAdam(var, m, v, grad, force_gpu)
def testBasic(self):
for dtype in [tf.complex64]:
with self.test_session(force_gpu=True):
v0 = [1.0+2.0j, 2.0+1.0j]
v1 = [3.0-4.0j, 4.0-3.0j]
g0 = [0.1+0.1j, 0.1-0.1j]
g1 = [0.01-0.01j, 0.01+0.01j]
lr = 3.0-1.5j
var0 = tf.Variable(v0, dtype=dtype)
var1 = tf.Variable(v1, dtype=dtype)
grads0 = tf.constant(g0, dtype=dtype)
grads1 = tf.constant(g1, dtype=dtype)
sgd_op = ctf.train.CplxGradientDescentOptimizer(
lr).apply_gradients(zip([grads0, grads1], [var0, var1]))
tf.global_variables_initializer().run()
# Fetch params to validate initial values
self.assertAllCloseAccordingToType(v0, var0.eval())
self.assertAllCloseAccordingToType(v1, var1.eval())
# Run 1 step of sgd
sgd_op.run()
# Validate updated params
self.assertAllCloseAccordingToType(
[v0[0] - lr * g0[0],
v0[1] - lr * g0[1]], var0.eval())
self.assertAllCloseAccordingToType(
[v1[0] - lr * g1[0],
v1[1] - lr * g1[1]], var1.eval())
def testTensorLearningRate(self):
for dtype in [tf.complex64]:
with self.test_session(force_gpu=True):
v0 = [1.0+2.0j, 2.0+1.0j]
v1 = [3.0-4.0j, 4.0-3.0j]
g0 = [0.1+0.1j, 0.1-0.1j]
g1 = [0.01-0.01j, 0.01+0.01j]
lr = 3.0-1.5j
var0 = tf.Variable(v0, dtype=dtype)
var1 = tf.Variable(v1, dtype=dtype)
grads0 = tf.constant(g0, dtype=dtype)
grads1 = tf.constant(g1, dtype=dtype)
lrate = tf.constant(lr)
sgd_op = ctf.train.CplxGradientDescentOptimizer(
lrate).apply_gradients(zip([grads0, grads1], [var0, var1]))
tf.global_variables_initializer().run()
# Fetch params to validate initial values
self.assertAllCloseAccordingToType(v0, var0.eval())
self.assertAllCloseAccordingToType(v1, var1.eval())
# Run 1 step of sgd
sgd_op.run()
# Validate updated params
self.assertAllCloseAccordingToType(
[v0[0] - lr * g0[0],
v0[1] - lr * g0[1]], var0.eval())
self.assertAllCloseAccordingToType(
[v1[0] - lr * g1[0],
v1[1] - lr * g1[1]], var1.eval())
def testGradWrtRef(self):
for dtype in [tf.complex64]:
with self.test_session(force_gpu=True):
values = [1.0+2.0j, 2.0+1.0j]
lr = 3.0-1.5j
opt = ctf.train.CplxGradientDescentOptimizer(lr)
values = [1.0, 3.0]
vars_ = [tf.Variable([v], dtype=dtype) for v in values]
grads_and_vars = opt.compute_gradients(
vars_[0]._ref() + vars_[1], vars_)
tf.global_variables_initializer().run()
for grad, _ in grads_and_vars:
self.assertAllCloseAccordingToType([1.0], grad.eval())
def testTensorLearningRate(self):
for dtype in [tf.complex64]:
with self.test_session(force_gpu=True):
# Initialize variables for numpy implementation.
m0, v0, m1, v1 = 0.0, 0.0, 0.0, 0.0
var0_np = np.array([1.0-1.0j, 2.0-2.0j], dtype=dtype.as_numpy_dtype)
grads0_np = np.array([0.1+0.1j, 0.1-0.1j], dtype=dtype.as_numpy_dtype)
var1_np = np.array([3.0+3.0j, 4.0-4.0j], dtype=dtype.as_numpy_dtype)
grads1_np = np.array([0.01-0.01j, 0.01+0.01j],
dtype=dtype.as_numpy_dtype)
var0 = tf.Variable(var0_np)
var1 = tf.Variable(var1_np)
grads0 = tf.constant(grads0_np)
grads1 = tf.constant(grads1_np)
opt = tf.train.AdamOptimizer(tf.constant(0.001))
update = opt.apply_gradients(zip([grads0, grads1], [var0, var1]))
tf.global_variables_initializer().run()
# Fetch params to validate initial values
self.assertAllClose([1.0-1.0j, 2.0-2.0j], var0.eval())
self.assertAllClose([3.0+3.0j, 4.0-4.0j], var1.eval())
beta1_power, beta2_power = opt._get_beta_accumulators()
# Run 3 steps of Adam
for t in range(1, 4):
self.assertAllCloseAccordingToType(0.9 ** t, beta1_power.eval())
self.assertAllCloseAccordingToType(0.999 ** t, beta2_power.eval())
update.run()
var0_np, m0, v0 = adam_update_numpy(var0_np, grads0_np, t, m0, v0)
var1_np, m1, v1 = adam_update_numpy(var1_np, grads1_np, t, m1, v1)
# Validate updated params
self.assertAllCloseAccordingToType(var0_np, var0.eval())
self.assertAllCloseAccordingToType(var1_np, var1.eval())
def testComplexReduce3D(self):
# Create a 3D array of floats and reduce across all possible
# dimensions
np_arr = (np.linspace(10, -10, 30) +
1j*np.linspace(-10, 10, 30)).reshape(
[2, 3, 5]).astype(np.complex64)
self._compareAll(np_arr, None)
self._compareAll(np_arr, [])
self._compareAll(np_arr, [0])
self._compareAll(np_arr, [1])
self._compareAll(np_arr, [2])
self._compareAll(np_arr, [0, 1])
self._compareAll(np_arr, [1, 2])
self._compareAll(np_arr, [0, 2])
self._compareAll(np_arr, [0, 1, 2])
def testInfinity(self):
for dtype in [np.complex64]:
for special_value_x in [-np.inf, np.inf]:
for special_value_y in [-np.inf, np.inf]:
np_arr = np.array([special_value_x, special_value_y]).astype(dtype)
self._compareAll(np_arr, None)
def _testTypes(self, vals):
for dtype in [np.complex64]:
self.setUp()
x = vals.astype(dtype)
tftype = _NP_TO_TF[dtype]
self.assertAllEqual(x, self._initFetch(x, tftype, use_gpu=False))
# NOTE(touts): the GPU test should pass for all types, whether the
# Variable op has an implementation for that type on GPU as we expect
# that Variable and Assign have GPU implementations for matching tf.
self.assertAllEqual(x, self._initFetch(x, tftype, use_gpu=True))
def testset_shape(self):
p = state_ops.variable_op([1, 2], tf.complex64)
self.assertEqual([1, 2], p.get_shape())
p = state_ops.variable_op([1, 2], tf.complex64, set_shape=False)
self.assertEqual(tensor_shape.unknown_shape(), p.get_shape())
def testAssign(self):
value = np.array([[42.0+42.0j, 43.0+43.0j]])
var = state_ops.variable_op(value.shape, tf.complex64)
self.assertShapeEqual(value, var)
assigned = tf.assign(var, value)
self.assertShapeEqual(value, assigned)
def testAssignNoValidateShape(self):
value = np.array([[42.0+42.0j, 43.0+43.0j]])
var = state_ops.variable_op(value.shape, tf.complex64)
self.assertShapeEqual(value, var)
assigned = tf.assign(var, value, validate_shape=False)
self.assertShapeEqual(value, assigned)
def testAssignNoVarShape(self):
value = np.array([[42.0+42.0j, 43.0+43.0j]])
var = state_ops.variable_op(value.shape, tf.complex64, set_shape=False)
self.assertEqual(tensor_shape.unknown_shape(), var.get_shape())
assigned = tf.assign(var, value)
self.assertShapeEqual(value, assigned)
def _NewShapelessTensor(self):
tensor = tf.placeholder(tf.complex64)
self.assertEqual(tensor_shape.unknown_shape(), tensor.get_shape())
return tensor
def testAssignNoValueShape(self):
value = self._NewShapelessTensor()
shape = [1, 2]
var = state_ops.variable_op(shape, tf.complex64)
assigned = tf.assign(var, value)
self.assertEqual(shape, var.get_shape())
self.assertEqual(shape, assigned.get_shape())
def testAssignNoValueShapeNoValidateShape(self):
value = self._NewShapelessTensor()
shape = [1, 2]
var = state_ops.variable_op(shape, tf.complex64)
self.assertEqual(shape, var.get_shape())
assigned = tf.assign(var, value, validate_shape=False)
self.assertEqual(tensor_shape.unknown_shape(), assigned.get_shape())
def testAssignNoShape(self):
with self.test_session():
value = self._NewShapelessTensor()
var = state_ops.variable_op([1, 2], tf.complex64, set_shape=False)
self.assertEqual(tensor_shape.unknown_shape(), var.get_shape())
self.assertEqual(tensor_shape.unknown_shape(),
tf.assign(var, value).get_shape())
def testAssignNoShapeNoValidateShape(self):
with self.test_session():
value = self._NewShapelessTensor()
var = state_ops.variable_op([1, 2], tf.complex64, set_shape=False)
self.assertEqual(tensor_shape.unknown_shape(), var.get_shape())
self.assertEqual(tensor_shape.unknown_shape(),
tf.assign(var, value, validate_shape=False).get_shape())
def testAssignUpdateNoVarShape(self):
var = state_ops.variable_op([1, 2], tf.complex64, set_shape=False)
added = tf.assign_add(var, [[2.0+2.0j, 3.0+3.0j]])
self.assertEqual([1, 2], added.get_shape())
subbed = tf.assign_sub(var, [[12.0+12.0j, 13.0+13.0j]])
self.assertEqual([1, 2], subbed.get_shape())
def testAssignUpdateNoValueShape(self):
var = state_ops.variable_op([1, 2], tf.complex64)
added = tf.assign_add(var, self._NewShapelessTensor())
self.assertEqual([1, 2], added.get_shape())
subbed = tf.assign_sub(var, self._NewShapelessTensor())
self.assertEqual([1, 2], subbed.get_shape())
def testAssignUpdateNoShape(self):
var = state_ops.variable_op([1, 2], tf.complex64, set_shape=False)
added = tf.assign_add(var, self._NewShapelessTensor())
self.assertEqual(tensor_shape.unknown_shape(), added.get_shape())
subbed = tf.assign_sub(var, self._NewShapelessTensor())
self.assertEqual(tensor_shape.unknown_shape(), subbed.get_shape())
def testTemporaryVariable(self):
with self.test_session(use_gpu=True):
var = gen_state_ops._temporary_variable(
[1, 2],
tf.complex64,
var_name="foo")
var = tf.assign(var, [[4.0+5.0j, 5.0+4.0j]])
var = tf.assign_add(var, [[6.0+7.0j, 7.0+6.0j]])
final = gen_state_ops._destroy_temporary_variable(var, var_name="foo")
self.assertAllClose([[10.0+12.0j, 12.0+10.0j]], final.eval())
def testDestroyNonexistentTemporaryVariable(self):
with self.test_session(use_gpu=True):
var = gen_state_ops._temporary_variable([1, 2], tf.complex64)
final = gen_state_ops._destroy_temporary_variable(var, var_name="bad")
with self.assertRaises(errors.NotFoundError):
final.eval()
def testDestroyTemporaryVariableTwice(self):
with self.test_session(use_gpu=True):
var = gen_state_ops._temporary_variable([1, 2], tf.complex64)
val1 = gen_state_ops._destroy_temporary_variable(var, var_name="dup")
val2 = gen_state_ops._destroy_temporary_variable(var, var_name="dup")
final = val1 + val2
with self.assertRaises(errors.NotFoundError):
final.eval()