def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
python类correlate()的实例源码
def autoCorrelation(s):
def ac1d(x):
var = x.var()
x = x - x.mean()
r = np.correlate(x[-x.size:], x, mode='full')
return r[r.size//2:] / (var * np.arange(x.shape[0], 0, -1))
return np.apply_along_axis(ac1d, 0, s)
def dodemod(rx, P:dict):
aud = None
fs = P['rxfs']
if P['demod']=='chirp':
tx = loadbin(P['txfn'], P['txfs'])
if tx is None:
warnings.warn('simulated chirp reception')
tx = rx
rx = 0.05*rx + 0.1*rx.max()*(np.random.randn(rx.size) + 1j*np.random.randn(rx.size))
txfs = fs
else:
rx = scipy.signal.resample_poly(rx, UP, DOWN)
fs = txfs = P['txfs']
txsec = tx.size/txfs # length of TX in seconds
if P['pri'] is None:
pri=txsec
print(f'Using {pri*1000} ms PRI and {P["Npulse"]} pulses incoherently integrated')
# %% integration
NrxPRI = int(fs * pri) # Number of RX samples per PRI
NrxStack = rx.size // NrxPRI # number of complete PRIs received in this data
Nint = NrxStack // P['Npulse'] # Number of steps we'll take iterating
Nextract = P['Npulse'] * NrxPRI # total number of samples to extract (in general part of one PRI is discarded after numerous PRIs)
ax=None
for i in range(Nint):
ci = slice(i*Nextract, (i+1)*Nextract)
rxint = rx[ci].reshape((NrxPRI, P['Npulse'])).mean(axis=1)
Rxy = np.correlate(tx, rxint, 'full')
ax = plotxcor(Rxy, txfs, ax)
draw(); pause(0.5)
elif P['demod']=='am':
aud = am_demod(P['again']*rx, fs, fsaudio, P['fc'], p.audiobw, frumble=p.frumble, verbose=True)
elif P['demod']=='ssb':
aud = ssb_demod(P['again']*rx, fs, fsaudio, P['fc'], p.audiobw,verbose=True)
return aud,fs
def plot(seq):
"""Plot the autocorrelation of the given sequence."""
import matplotlib.pyplot as plt
bipolar = np.where(seq, 1.0, -1.0)
autocorr = np.correlate(bipolar, bipolar, 'same')
plt.figure()
plt.title("Length {} Gold code autocorrelation".format(len(seq)))
xdata = np.arange(len(seq)) - len(seq) // 2
plt.plot(xdata, autocorr, '.-')
plt.show()
def _print_stats(seq):
bipolar = np.where(seq, 1.0, -1.0)
autocorr = np.correlate(bipolar, bipolar, 'same')
peaks = np.sort(np.abs(autocorr))
peak = peaks[-1]
noise = np.sqrt(np.mean(peaks[:-1]**2))
peak_to_peak2 = peak / peaks[-2]
peak_to_noise = peak / noise
print("Peak amplitude: {:.0f}".format(peak))
print("Largest non-peak amplitude: {:.0f}".format(peaks[-2]))
print("Peak-to-max: {:.2f}".format(peak_to_peak2))
print("Peak-to-noise: {:.2f}".format(peak_to_noise))
def correlationIndividual(data, idx = (0,1), cls = -1, delay = (-100, 100)):
"""Calculate corrs and auto correlation in time between the various measures"""
n = len(idx);
means = np.mean(data[:,:-1], axis = 0);
nd = delay[1] - delay[0] + 1;
cc = np.zeros((nd,n,n))
for i in range(n):
for j in range(n):
if delay[0] < 0:
cm = np.correlate(data[:, i] - means[i], data[-delay[0]:, j] - means[j]);
else:
cm = [0];
if delay[1] > 0:
cp = np.correlate(data[:, j] - means[j], data[delay[1]:, i] - means[i]);
else:
cp = [0];
ca = np.concatenate((cm[1:], cp[::-1]));
if delay[0] > 0:
cc[:,i,j] = ca[delay[0]:];
elif delay[1] < 0:
cc[:,i,j] = ca[:-delay[1]];
else:
cc[:,i,j] = ca;
return cc;
def correlationIndividualStages(data, idx = (0,1), cls = -1, delay = (-100, 100)):
"""Calculate correlation functions in time between the various measures in each stage"""
stages = stageIndex(data, cls = cls);
stages = np.insert(np.append(stages, data.shape[0]), 0, 0);
ns = len(stages) - 1;
n = len(idx);
means = np.mean(data[:,:-1], axis = 0);
nd = delay[1] - delay[0] + 1;
cc = np.zeros((nd,n,n,ns))
for s in range(ns):
dat = data[stages[s]:stages[s+1],:];
for i in range(n):
for j in range(n):
if delay[0] < 0:
cm = np.correlate(dat[:, i] - means[i], dat[-delay[0]:, j] - means[j]);
else:
cm = [0];
if delay[1] > 0:
cp = np.correlate(dat[:, j] - means[j], dat[delay[1]:, i] - means[i]);
else:
cp = [0];
ca = np.concatenate((cm[1:], cp[::-1]));
if delay[0] > 0:
cc[:,i,j,s] = ca[delay[0]:];
elif delay[1] < 0:
cc[:,i,j,s] = ca[:-delay[1]];
else:
cc[:,i,j,s] = ca;
return cc;
def correlogram(experiment):
left = experiment.getresults('left')
right = experiment.getresults('right')
corr_trials = [np.correlate(l,r,'same') for l,r in zip(left,right)]
return np.mean(corr_trials,axis=0)
def five_group_stats(group):
sales = np.array(group['Demanda_uni_equil'].values)
samana = group['Semana'].values
max_index = np.argmax(samana)
returns = group['Dev_proxima'].mean()
#this is signature on when slaes happens
sorted_samana_index = np.argsort(samana)
sorted_sales = sales[sorted_samana_index]
signature = np.sum([ math.pow(2,s-3) for s in samana])
kurtosis = fillna_and_inf(scipy.stats.kurtosis(sorted_sales))
hmean = fillna_and_inf(scipy.stats.hmean(np.where(sales <0, 0.1, sales)))
entropy = fillna_and_inf(scipy.stats.entropy(sales))
std = fillna_and_inf(np.std(sales))
N = len(sales)
ci = fillna_and_inf(calculate_ci(std, N))
corr = fillna_and_inf(scipy.stats.pearsonr(range(N), sorted_sales)[0])
autocorr_list = np.correlate(sorted_sales, sorted_sales, mode='same')
mean_autocorr = fillna_and_inf(np.mean(autocorr_list))
mean = np.mean(sales)
mean_corss_points_count = 0
if N > 1:
high_than_mean = mean < sorted_sales[0]
for i in range(1,N):
if (high_than_mean and mean > sorted_sales[i]) or (not high_than_mean and mean > sorted_sales[i]):
mean_corss_points_count += mean_corss_points_count
high_than_mean = mean < sorted_sales[i]
return mean, N, std, np.median(sales), sales[max_index], samana[max_index], \
returns, signature, kurtosis, hmean, entropy, ci, corr, mean_autocorr, mean_corss_points_count
test_numeric.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 39
收藏 0
点赞 0
评论 0
def test_float(self):
self._setup(np.float)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.x, self.y[:-1], 'full')
assert_array_almost_equal(z, self.z1_4)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
z = np.correlate(self.x[::-1], self.y, 'full')
assert_array_almost_equal(z, self.z1r)
z = np.correlate(self.y, self.x[::-1], 'full')
assert_array_almost_equal(z, self.z2r)
z = np.correlate(self.xs, self.y, 'full')
assert_array_almost_equal(z, self.zs)
test_numeric.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def test_object(self):
self._setup(Decimal)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
test_numeric.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_no_overwrite(self):
d = np.ones(100)
k = np.ones(3)
np.correlate(d, k)
assert_array_equal(d, np.ones(100))
assert_array_equal(k, np.ones(3))
test_numeric.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
def test_float(self):
self._setup(np.float)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.x, self.y[:-1], 'full')
assert_array_almost_equal(z, self.z1_4)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
z = np.correlate(self.x[::-1], self.y, 'full')
assert_array_almost_equal(z, self.z1r)
z = np.correlate(self.y, self.x[::-1], 'full')
assert_array_almost_equal(z, self.z2r)
z = np.correlate(self.xs, self.y, 'full')
assert_array_almost_equal(z, self.zs)
def test_object(self):
self._setup(Decimal)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
def test_no_overwrite(self):
d = np.ones(100)
k = np.ones(3)
np.correlate(d, k)
assert_array_equal(d, np.ones(100))
assert_array_equal(k, np.ones(3))
def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
def angle_from_audio(self, file_name, chunks):
[rate, wave] = wavfile.read(file_name)
raw_0 = wave[:, 0].astype(np.float64)
raw_1 = wave[:, 1].astype(np.float64)
for i in range(1, chunks):
start = i*chunks
end = (i+1)*chunks
left = raw_0[start:end]
right = raw_1[start-self.buffer:end+self.buffer]
corr_arr = np.correlate(right, left, 'valid')
max_index = (len(corr_arr)/2)-np.argmax(corr_arr)
time_d = max_index/float(rate)
signal_dist = time_d*self.sound_speed
if (signal_dist != 0 and abs(signal_dist)<=self.mic_dist):
angle = math.degrees(math.asin( signal_dist / self.mic_dist))
self.angles.append(angle)
a = np.array(self.angles)
hist, bins = np.histogram(a, bins=10)
# width = 0.7 * (bins[1] - bins[0])
# center = (bins[:-1] + bins[1:]) / 2
# plt.bar(center, hist, align='center', width=width)
# plt.xlabel('Angle (degrees)', fontsize=16)
# plt.show()
index = np.argmax(hist)
self.angle_pred = bins[index]
print self.angle_pred
traffic_predict_15min.py 文件源码
项目:Spatial-temporal-modeling-prediction
作者: Johnson0722
项目源码
文件源码
阅读 68
收藏 0
点赞 0
评论 0
def autocorrelation(x,lags): #Temporal correlation
n = len(x)
x = np.array(x)
result = [np.correlate(x[i:] - x[i:].mean(),x[:n-i]-x[:n-i].mean())[0]\
/(x[i:].std()*x[:n-i].std()*(n-i)) for i in range(1,lags+1)]
return result
ARIMA_15min.py 文件源码
项目:Spatial-temporal-modeling-prediction
作者: Johnson0722
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def autocorrelation(x,lags): #Temporal correlation
n = len(x)
x = np.array(x)
result = [np.correlate(x[i:] - x[i:].mean(),x[:n-i]-x[:n-i].mean())[0]\
/(x[i:].std()*x[:n-i].std()*(n-i)) for i in range(1,lags+1)]
return result