def cmdct(x, odd=True):
""" Calculate complex modified discrete cosine transform of input
inefficient pure-Python method.
Use only for testing.
Parameters
----------
X : array_like
The input signal
odd : boolean, optional
Switch to oddly stacked transform. Defaults to :code:`True`.
Returns
-------
out : array_like
The output signal
"""
return trans(x, func=lambda x: numpy.cos(x) - 1j * numpy.sin(x), odd=odd)
python类complex()的实例源码
def icmdct(X, odd=True):
""" Calculate inverse complex modified discrete cosine transform of input
signal in an inefficient pure-Python method.
Use only for testing.
Parameters
----------
X : array_like
The input signal
odd : boolean, optional
Switch to oddly stacked transform. Defaults to :code:`True`.
Returns
-------
out : array_like
The output signal
"""
return itrans(X, func=lambda x: numpy.cos(x) + 1j * numpy.sin(x), odd=odd)
def sphankel1(n, kr):
"""Spherical Hankel (first kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
hn1 : complex float
Spherical Hankel function hn (first kind)
"""
n, kr = scalar_broadcast_match(n, kr)
hn1 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
hn1[kr_nonzero] = _np.sqrt(_np.pi / 2) / _np.lib.scimath.sqrt(kr[kr_nonzero]) * hankel1(n[kr_nonzero] + 0.5, kr[kr_nonzero])
return hn1
def sphankel2(n, kr):
"""Spherical Hankel (second kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
hn2 : complex float
Spherical Hankel function hn (second kind)
"""
n, kr = scalar_broadcast_match(n, kr)
hn2 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
hn2[kr_nonzero] = _np.sqrt(_np.pi / 2) / _np.lib.scimath.sqrt(kr[kr_nonzero]) * hankel2(n[kr_nonzero] + 0.5, kr[kr_nonzero])
return hn2
def dspbessel(n, kr):
"""Derivative of spherical Bessel (first kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
J' : complex float
Derivative of spherical Bessel
"""
return _np.squeeze((n * spbessel(n - 1, kr) - (n + 1) * spbessel(n + 1, kr)) / (2 * n + 1))
def dsphankel1(n, kr):
"""Derivative spherical Hankel (first kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
dhn1 : complex float
Derivative of spherical Hankel function hn' (second kind)
"""
n, kr = scalar_broadcast_match(n, kr)
dhn1 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
dhn1[kr_nonzero] = 0.5 * (sphankel1(n[kr_nonzero] - 1, kr[kr_nonzero]) - sphankel1(n[kr_nonzero] + 1, kr[kr_nonzero]) - sphankel1(n[kr_nonzero], kr[kr_nonzero]) / kr[kr_nonzero])
return dhn1
def dsphankel2(n, kr):
"""Derivative spherical Hankel (second kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
dhn2 : complex float
Derivative of spherical Hankel function hn' (second kind)
"""
n, kr = scalar_broadcast_match(n, kr)
dhn2 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
dhn2[kr_nonzero] = 0.5 * (sphankel2(n[kr_nonzero] - 1, kr[kr_nonzero]) - sphankel2(n[kr_nonzero] + 1, kr[kr_nonzero]) - sphankel2(n[kr_nonzero], kr[kr_nonzero]) / kr[kr_nonzero])
return dhn2
def sph_harm_all(nMax, az, el, type='complex'):
'''Compute all sphercial harmonic coefficients up to degree nMax.
Parameters
----------
nMax : (int)
Maximum degree of coefficients to be returned. n >= 0
az: (float), array_like
Azimuthal (longitudinal) coordinate [0, 2pi], also called Theta.
el : (float), array_like
Elevation (colatitudinal) coordinate [0, pi], also called Phi.
Returns
-------
y_mn : (complex float), array_like
Complex spherical harmonics of degrees n [0 ... nMax] and all corresponding
orders m [-n ... n], sampled at [az, el]. dim1 corresponds to az/el pairs,
dim2 to oder/degree (m, n) pairs like 0/0, -1/1, 0/1, 1/1, -2/2, -1/2 ...
'''
m, n = mnArrays(nMax)
mA, azA = _np.meshgrid(m, az)
nA, elA = _np.meshgrid(n, el)
return sph_harm(mA, nA, azA, elA, type=type)
def test_simple_complex(self):
# Test native complex input with native double scalar min/max.
# Test native input with complex double scalar min/max.
a = 3 * self._generate_data_complex(self.nr, self.nc)
m = -0.5
M = 1.
ac = self.fastclip(a, m, M)
act = self.clip(a, m, M)
assert_array_strict_equal(ac, act)
# Test native input with complex double scalar min/max.
a = 3 * self._generate_data(self.nr, self.nc)
m = -0.5 + 1.j
M = 1. + 2.j
ac = self.fastclip(a, m, M)
act = self.clip(a, m, M)
assert_array_strict_equal(ac, act)
def test_against_cmath(self):
import cmath
points = [-1-1j, -1+1j, +1-1j, +1+1j]
name_map = {'arcsin': 'asin', 'arccos': 'acos', 'arctan': 'atan',
'arcsinh': 'asinh', 'arccosh': 'acosh', 'arctanh': 'atanh'}
atol = 4*np.finfo(np.complex).eps
for func in self.funcs:
fname = func.__name__.split('.')[-1]
cname = name_map.get(fname, fname)
try:
cfunc = getattr(cmath, cname)
except AttributeError:
continue
for p in points:
a = complex(func(np.complex_(p)))
b = cfunc(p)
assert_(abs(a - b) < atol, "%s %s: %s; cmath: %s" % (fname, p, a, b))
def test_complex_nan_comparisons():
nans = [complex(np.nan, 0), complex(0, np.nan), complex(np.nan, np.nan)]
fins = [complex(1, 0), complex(-1, 0), complex(0, 1), complex(0, -1),
complex(1, 1), complex(-1, -1), complex(0, 0)]
with np.errstate(invalid='ignore'):
for x in nans + fins:
x = np.array([x])
for y in nans + fins:
y = np.array([y])
if np.isfinite(x) and np.isfinite(y):
continue
assert_equal(x < y, False, err_msg="%r < %r" % (x, y))
assert_equal(x > y, False, err_msg="%r > %r" % (x, y))
assert_equal(x <= y, False, err_msg="%r <= %r" % (x, y))
assert_equal(x >= y, False, err_msg="%r >= %r" % (x, y))
assert_equal(x == y, False, err_msg="%r == %r" % (x, y))
def test_basic(self):
dt_numeric = np.typecodes['AllFloat'] + np.typecodes['AllInteger']
dt_complex = np.typecodes['Complex']
# test real
a = np.eye(3)
for dt in dt_numeric + 'O':
b = a.astype(dt)
res = np.vdot(b, b)
assert_(np.isscalar(res))
assert_equal(np.vdot(b, b), 3)
# test complex
a = np.eye(3) * 1j
for dt in dt_complex + 'O':
b = a.astype(dt)
res = np.vdot(b, b)
assert_(np.isscalar(res))
assert_equal(np.vdot(b, b), 3)
# test boolean
b = np.eye(3, dtype=np.bool)
res = np.vdot(b, b)
assert_(np.isscalar(res))
assert_equal(np.vdot(b, b), True)
def set_params(self, values):
"""
Coverts the symbolic values into numerical values.
The order of the parameters must be the same as the one given
in the class instantiaion.
:param list values: a list containing the values for the sympy symbols.
:return: None
:rtype: None
:raises: RuntimeError
"""
if self._symFClambda != None:
self.fc_set( np.array(self._symFClambda(*values), \
dtype=np.complex), \
self._inputType)
else:
raise RuntimeError("Symbolic FC not defined!")
def test_dipolar_tensor(self):
# initial stupid test...
###### TODO : do a reasonable test!!! ######
p = np.array([[0.,0.,0.]])
fc = np.array([[0.,0.,1.]],dtype=np.complex)
k = np.array([0.,0.,0.0])
phi= np.array([0.,])
mu = np.array([0.5,0.5,0.5])
sc = np.array([10,10,10],dtype=np.int32)
latpar = np.diag([2.,2.,2.])
r = 10.
res = lfcext.DipolarTensor(p,mu,sc,latpar,r)
np.testing.assert_array_almost_equal(res, np.zeros([3,3]))
mu = np.array([0.25,0.25,0.25])
res = lfcext.DipolarTensor(p,mu,sc,latpar,r)
np.testing.assert_array_almost_equal(np.trace(res), np.zeros([3]))
np.testing.assert_array_almost_equal(res, res.copy().T)
def update_descriptors(self):
logger.debug("Updating Plotter %s descriptors based on input descriptor %s", self.name, self.sink.descriptor)
self.stream = self.sink.input_streams[0]
self.descriptor = self.sink.descriptor
try:
self.time_pts = self.descriptor.axes[self.descriptor.axis_num("time")].points
self.record_length = len(self.time_pts)
except ValueError:
raise ValueError("Single shot filter sink does not appear to have a time axis!")
self.num_segments = len(self.sink.descriptor.axes[self.descriptor.axis_num("segment")].points)
self.ground_data = np.zeros((self.record_length, self.num_segments//2), dtype=np.complex)
self.excited_data = np.zeros((self.record_length, self.num_segments//2), dtype=np.complex)
output_descriptor = DataStreamDescriptor()
output_descriptor.axes = [_ for _ in self.descriptor.axes if type(_) is SweepAxis]
output_descriptor._exp_src = self.sink.descriptor._exp_src
output_descriptor.dtype = np.complex128
for os in self.fidelity.output_streams:
os.set_descriptor(output_descriptor)
os.end_connector.update_descriptors()
def __init__(self, module, min_delay_ms, autostart=True):
self._module = module
self._min_delay_ms = min_delay_ms
self.current_point = 0
self.current_avg = 0
self.n_points = self._module.points
self._paused = True
self._fut = None # placeholder for next point future
self.never_started = True
super(NaCurveFuture, self).__init__()
self.data_x = copy(self._module._data_x) # In case of saving latter.
self.data_avg = np.empty(self.n_points,
dtype=np.complex)
self.data_avg.fill(np.nan)
self.data_amp = np.empty(self.n_points)
self.data_amp.fill(np.nan)
# self.start()
self._reset_benchmark()
self.measured_time_per_point = np.nan # measured over last scan
if autostart:
self.start()
def threshold_hook(self, current_val): # goes in the module...
"""
A convenience function to stop the run upon some condition
(such as reaching of a threshold. current_val is the complex amplitude
of the last data point).
To be overwritten in derived class...
Parameters
----------
current_val
Returns
-------
"""
pass
# Concrete implementation of AcquisitionModule methods and attributes:
# --------------------------------------------------------------------
def set_value(self, obj, value):
"""
the master's setter writes its value to the slave lists
"""
real, complex = [], []
for v in value:
# separate real from complex values
if np.imag(v) == 0:
real.append(v.real)
else:
complex.append(v)
# avoid calling setup twice
with obj.do_setup:
setattr(obj, 'complex_' + self.name, complex)
setattr(obj, 'real_' + self.name, real)
# this property should have call_setup=True, such that obj._setup()
# is called automatically after this function
def list_changed(self, module, operation, index, value=None):
""" make sure that an element from one of the four lists is selected at once"""
if operation == 'select':
# unselect all others
if not hasattr(module, '_selecting') or not getattr(module, '_selecting'):
try:
setattr(module, '_selecting', True)
for name in [start+'_'+end for start in ['real', 'complex'] for end in ['poles', 'zeros']]:
if name != self.name:
getattr(module, name).selected = None
module._logger.info('%s.selected = None', name)
setattr(module, '_selected_pole_or_zero', self.name)
setattr(module, '_selected_index', index)
finally:
setattr(module, '_selecting', False)
super(IirFloatListProperty, self).list_changed(module, operation, index, value=value)
module._signal_launcher.update_plot.emit()
else:
super(IirFloatListProperty, self).list_changed(module, operation, index, value=value)
def freqz_(sys, w, dt=8e-9):
"""
This function computes the frequency response of a zpk system at an
array of frequencies.
It loosely mimicks 'scipy.signal.frequresp'.
Parameters
----------
system: (zeros, poles, k)
zeros and poles both in rad/s, k is the actual coefficient, not DC gain
w: np.array
frequencies in rad/s
dt: sampling time
Returns
-------
np.array(..., dtype=np.complex) with the response
"""
z, p, k = sys
b, a = sig.zpk2tf(z, p, k)
_, h = sig.freqz(b, a, worN=w*dt)
return h
def tf_inputfilter(self, inputfilter=None, frequencies=None): # input
# anti aliasing filter and additional delay model
# delay comes from latency in fpga + on average half sampling time
moduledelay = self.moduledelay + self.loops / 2.0
if inputfilter is None:
inputfilter = self.inputfilter
if frequencies is None:
frequencies = self.frequencies
frequencies = np.asarray(frequencies, dtype=np.complex)
try:
len(inputfilter)
except:
inputfilter = [inputfilter] # make it iterable
tf = frequencies*0 + 1.0
for f in inputfilter:
if f > 0: # lowpass
tf /= (1.0 + 1j * frequencies / f)
moduledelay += 1
elif f < 0: # highpass
tf /= (1.0 + 1j * f / frequencies)
moduledelay += 2
# add delay
delay = moduledelay * self.dt # + extradelay
tf *= np.exp(-1j*delay*frequencies*2*np.pi)
return tf
def tf_partialfraction(self, frequencies=None):
"""
Returns the transfer function just before the partial fraction
expansion for frequencies.
Parameters
----------
sys: (poles, zeros, k)
dt: sampling time
continuous: if True, returns the transfer function in continuous
time domain, if False converts to discrete one
method: method for scipy.signal.cont2discrete
alpha: alpha for above method (see scipy documentation)
Returns
-------
np.array(..., dtype=np.complex)
"""
# this code is more or less a direct copy of get_coeff()
if frequencies is None:
frequencies = self.frequencies
frequencies = np.asarray(frequencies, dtype=np.complex128)
r, p, c = self.rp_continuous
h = freqs_rp(r, p, c, frequencies*2*np.pi)
return h * self.tf_inputfilter(frequencies=frequencies)
def _filter_transfer_function(cls,
frequencies, filter_values,
frequency_correction=1.):
"""
Transfer function of the inputfilter part of a pid module
"""
frequencies = np.array(frequencies, dtype=np.complex)
module_delay = 0
tf = np.ones(len(frequencies), dtype=complex)
# input filter modelisation
if not isinstance(filter_values, list):
filter_values = list([filter_values])
for f in filter_values:
if f == 0:
continue
elif f > 0: # lowpass
tf /= (1.0 + 1j * frequencies / f)
module_delay += 2 # two cycles extra delay per lowpass
elif f < 0: # highpass
tf /= (1.0 + 1j * f / frequencies)
# plus is correct here since f already has a minus sign
module_delay += 1 # one cycle extra delay per highpass
delay = module_delay * 8e-9 / frequency_correction
tf *= np.exp(-1j * delay * frequencies * 2 * np.pi)
return tf
def merge_waveform(n, chAB, chAm1, chAm2, chBm1, chBm2):
'''
Builds packed I and Q waveforms from the nth mini LL, merging in marker data.
'''
wfAB = np.array([], dtype=np.complex)
for entry in chAB['linkList'][n % len(chAB['linkList'])]:
if not entry.isTimeAmp:
wfAB = np.append(wfAB, chAB['wfLib'][entry.key])
else:
wfAB = np.append(wfAB, chAB['wfLib'][entry.key][0] *
np.ones(entry.length * entry.repeat))
wfAm1 = marker_waveform(chAm1['linkList'][n % len(chAm1['linkList'])],
chAm1['wfLib'])
wfAm2 = marker_waveform(chAm2['linkList'][n % len(chAm2['linkList'])],
chAm2['wfLib'])
wfBm1 = marker_waveform(chBm1['linkList'][n % len(chBm1['linkList'])],
chBm1['wfLib'])
wfBm2 = marker_waveform(chBm2['linkList'][n % len(chBm2['linkList'])],
chBm2['wfLib'])
wfA = pack_waveform(np.real(wfAB), wfAm1, wfAm2)
wfB = pack_waveform(np.imag(wfAB), wfBm1, wfBm2)
return wfA, wfB
def CLEAR(amp=1, length=0, sigma=0, sampling_rate=1e9, **params):
"""
Pulse shape to quickly deplete the cavity at the end of a measurement.
measPulse followed by 2 steps of length step_length and amplitudes amp1, amp2.
"""
if 'amp1' not in params:
params['amp1'] = 0
if 'amp2' not in params:
params['amp2'] = 0
if 'step_length' not in params:
params['step_length'] = 100e-9
timePts = (1.0 / sampling_rate) * np.arange(np.round((length-2*params['step_length']) * sampling_rate))
flat_step = amp * (0.6 * np.exp(-timePts / sigma) + 0.4).astype(np.complex)
numPts_clear_step = int(np.round(params['step_length'] * sampling_rate))
clear_step_one = amp * params['amp1'] * np.ones(numPts_clear_step, dtype=np.complex)
clear_step_two = amp * params['amp2'] * np.ones(numPts_clear_step, dtype=np.complex)
return np.append(flat_step, [clear_step_one, clear_step_two])
def iqplot(data, spec='.', labels=None):
"""Plot signal points.
:param data: complex baseband signal points
:param spec: plot specifier (see :func:`matplotlib.pyplot.plot`)
:param labels: label for each signal point
>>> import arlpy
>>> arlpy.comms.iqplot(arlpy.comms.psk(8))
>>> arlpy.comms.iqplot(arlpy.comms.qam(16), 'rx')
>>> arlpy.comms.iqplot(arlpy.comms.psk(4), labels=['00', '01', '11', '10'])
"""
import matplotlib.pyplot as plt
data = _np.asarray(data)
if labels is None:
plt.plot(data.real, data.imag, spec)
else:
if labels == True:
labels = range(len(data))
for i in range(len(data)):
plt.text(data[i].real, data[i].imag, str(labels[i]))
plt.axis([-2, 2, -2, 2])
plt.grid()
plt.show()
def diff_decode(x):
"""Decode phase differential baseband signal.
:param x: complex baseband differential data to decode
:returns: decoded complex baseband data of length len(x)-1
>>> import arlpy
>>> d1 = arlpy.comms.random_data(100, 4)
>>> qpsk = arlpy.comms.psk(4)
>>> x = arlpy.comms.modulate(d1, qpsk)
>>> y = arlpy.comms.diff_encode(x)
>>> z = arlpy.comms.diff_decode(y)
>>> d2 = arlpy.comms.demodulate(z, qpsk)
>>> arlpy.comms.ser(d1, d2)
0.0
"""
x = _np.asarray(x)
y = _np.array(x)
y[1:] *= x[:-1].conj()
return y[1:]
def pb2bb(x, fs, fc, fd=None, flen=127, cutoff=None):
"""Convert passband signal to baseband.
The baseband conversion uses a low-pass filter after downconversion, with a
default cutoff frequency of `0.6*fd`, if `fd` is specified, or `1.1*fc` if `fd`
is not specified. Alternatively, the user may specify the cutoff frequency
explicitly.
For communication applications, one may wish to use :func:`arlpy.comms.downconvert` instead,
as that function supports matched filtering with a pulse shape rather than a generic
low-pass filter.
:param x: passband signal
:param fs: sampling rate of passband signal in Hz
:param fc: carrier frequency in passband in Hz
:param fd: sampling rate of baseband signal in Hz (``None`` => same as `fs`)
:param flen: number of taps in the low-pass FIR filter
:param cutoff: cutoff frequency in Hz (``None`` means auto-select)
:returns: complex baseband signal, sampled at `fd`
"""
if cutoff is None:
cutoff = 0.6*fd if fd is not None else 1.1*fc
y = x * _np.sqrt(2)*_np.exp(2j*_np.pi*fc*time(x,fs))
hb = _sig.firwin(flen, cutoff=cutoff, nyq=fs/2.0)
y = _sig.filtfilt(hb, 1, y)
if fd is not None and fd != fs:
y = _sig.resample_poly(y, 2*fd, fs)[::2]
return y
def test_simple_complex(self):
# Test native complex input with native double scalar min/max.
# Test native input with complex double scalar min/max.
a = 3 * self._generate_data_complex(self.nr, self.nc)
m = -0.5
M = 1.
ac = self.fastclip(a, m, M)
act = self.clip(a, m, M)
assert_array_strict_equal(ac, act)
# Test native input with complex double scalar min/max.
a = 3 * self._generate_data(self.nr, self.nc)
m = -0.5 + 1.j
M = 1. + 2.j
ac = self.fastclip(a, m, M)
act = self.clip(a, m, M)
assert_array_strict_equal(ac, act)
def test_against_cmath(self):
import cmath
points = [-1-1j, -1+1j, +1-1j, +1+1j]
name_map = {'arcsin': 'asin', 'arccos': 'acos', 'arctan': 'atan',
'arcsinh': 'asinh', 'arccosh': 'acosh', 'arctanh': 'atanh'}
atol = 4*np.finfo(np.complex).eps
for func in self.funcs:
fname = func.__name__.split('.')[-1]
cname = name_map.get(fname, fname)
try:
cfunc = getattr(cmath, cname)
except AttributeError:
continue
for p in points:
a = complex(func(np.complex_(p)))
b = cfunc(p)
assert_(abs(a - b) < atol, "%s %s: %s; cmath: %s" % (fname, p, a, b))