def downsample_utterance(features, seglist, n):
"""
Return the downsampled matrix with each row an embedding for a segment in
the seglist.
"""
embeddings = []
for i, j in seglist:
y = features[i:j+1, :].T
y_new = signal.resample(y, n, axis=1).flatten("C")
embeddings.append(y_new)
return np.asarray(embeddings)
#-----------------------------------------------------------------------------#
# MAIN FUNCTION #
#-----------------------------------------------------------------------------#
python类resample()的实例源码
def check_argv():
"""Check the command line arguments."""
parser = argparse.ArgumentParser(description=__doc__.strip().split("\n")[0], add_help=False)
parser.add_argument("input_npz_fn", type=str, help="input speech file")
parser.add_argument("output_npz_fn", type=str, help="output embeddings file")
parser.add_argument("n", type=int, help="number of samples")
parser.add_argument("--technique", choices=["interpolate", "resample", "rasanen"], default="resample")
parser.add_argument(
"--frame_dims", type=int, default=None,
help="only keep these number of dimensions"
)
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
return parser.parse_args()
#-----------------------------------------------------------------------------#
# MAIN FUNCTION #
#-----------------------------------------------------------------------------#
def get_intervals(pcg, transitions=None, interval='RR', resize=None):
"""
Given array transitions and a interval type
computes an array of starting indices and ending indices for that
given type of interval.
Args:
pcg : numpy array
transitions : tuple( numpy array begin, numpy array end)
begin - indices where the intervals start
end - indices where the intervals end
interval : string
Type of interval [RR, S1, Sys, S2, Dia]. Defaults to RR
resize : int
resample the interval to a specified length
Returns:
intervals : list<numpy array>
list of intervals of the specified type
"""
intervals = [pcg[i:j] for i, j in zip(*boundaries(transitions, interval))]
if resize:
intervals = [resample(i, resize) for i in intervals]
return intervals
def get_intervals(pcg, transitions=None, interval='RR', resize=None):
"""
Given array transitions and a interval type
computes an array of starting indices and ending indices for that
given type of interval.
Args:
pcg : numpy array
transitions : tuple( numpy array begin, numpy array end)
begin - indices where the intervals start
end - indices where the intervals end
interval : string
Type of interval [RR, S1, Sys, S2, Dia]. Defaults to RR
resize : int
resample the interval to a specified length
Returns:
intervals : list<numpy array>
list of intervals of the specified type
"""
intervals = [pcg[i:j] for i, j in zip(*boundaries(transitions, interval))]
if resize:
intervals = [resample(i, resize) for i in intervals]
return intervals
def _resample_rdp(self, n):
"""
Resample Radial Density Profile
:param n: int
:return: nothing
"""
columns = []
for i in xrange(n):
columns += ['edp_{}'.format(i)]
self.feature_names += columns
# edp = [None] * len(self._df)
edp = np.zeros([len(self._df), n])
for i, v in enumerate(self._df.RadialDensityProfile.values):
x = np.zeros([len(v)])
for j, v_ in enumerate(v):
x[j] = v_[0]
edp[i] = signal.resample(x, n)
df = pd.DataFrame(data=edp, columns=columns, index=self._df.index)
self._df = pd.concat([self._df, df], axis=1)
def _resample_edp(self, n):
"""
Resample Edge Density Profile
:param n: int
:return: nothing
"""
columns = []
for i in xrange(n):
columns += ['edp_{}'.format(i)]
self.feature_names += columns
# edp = [None] * len(self._df)
rdp = np.zeros([len(self._df), n])
for i, v in enumerate(self._df.EdgeDensityProfile.values):
x = np.zeros([len(v)])
for j, v_ in enumerate(v):
x[j] = v_[0]
rdp[i] = signal.resample(x, n)
df = pd.DataFrame(data=rdp, columns=columns, index=self._df.index)
self._df = pd.concat([self._df, df], axis=1)
def simple_resample(data, original_fs, target_fs):
"""Wrap scipy.signal.resample with a simpler API
"""
return resample(data, num=int(data.shape[1] * target_fs / original_fs), axis=1)
def get_name(self):
return "resample%d" % self.f
def apply(self, data):
axis = data.ndim - 1
if data.shape[-1] > self.f:
return resample(data, self.f, axis=axis)
return data
def get_name(self):
return "resample%dhanning" % self.f
def apply(self, data):
axis = data.ndim - 1
out = resample(data, self.f, axis=axis, window=hann(M=data.shape[axis]))
return out
def DTW_features(pcg, transitions, interval='RR', constraint='sakoe_chiba', k=0.1, norm="resample", pre=None, downsample_rate=2, suf=None, sigma=None):
k = float(k)
if sigma is not None:
sigma = float(sigma)
pcg = dtw_preprocess(pcg, pre=pre)
resize = ALENGTH[interval] if norm == 'resample' else None
intervals = get_intervals(pcg, transitions, interval=interval, resize=resize)
intervals = [resample(i, len(i) // downsample_rate) for i in intervals]
if norm not in ['path', 'resample']:
raise ValueError("Invalid normalization {0}".format(norm))
path_norm = norm == 'path'
dist_matrix = dtw_distances(intervals, n_jobs=-1, constraint=constraint, k=k, path_norm=path_norm, normalize=True)
dist_matrix = finite_matrix(dist_matrix)
medoid_index = np.argmin(np.sum(dist_matrix, axis=0))
# Remove the infinite distances
if sigma:
dist_matrix = -affinity(dist_matrix, sigma)
medoid_distances = dist_matrix[:, medoid_index]
medoid_distances = medoid_distances[np.isfinite(medoid_distances)]
m_MDTW, s_MDTW = _mean_std(medoid_distances)
Q1_MDTW, Q2_MDTW, Q3_MDTW = np.percentile(medoid_distances, [25, 50, 75])
contiguous_distances = np.array([dist_matrix[i, i + 1] for i in np.arange(len(dist_matrix) - 1)])
contiguous_distances = contiguous_distances[np.isfinite(contiguous_distances)]
Q1_CDTW, Q2_CDTW, Q3_CDTW = np.percentile(contiguous_distances, [25, 50, 75])
features = np.array([m_MDTW, s_MDTW, Q1_MDTW, Q2_MDTW, Q3_MDTW, Q1_CDTW, Q2_CDTW, Q3_CDTW])
return features
def cinter_DTW_features(pcg, transitions, templates='random_templates', pre=None, constraint='sakoe_chiba', k=0.1, suf=None, sigma=None):
k = float(k)
if sigma is not None:
sigma = float(sigma)
templates = custom_loadmat(TEMPLATE_FOLDER + templates)
inter_DTW_features.names = ["%s_d%02d" % (q, i) for q in ['mean', 'std'] for i, _ in enumerate(templates)]
pcg = dtw_preprocess(pcg, pre=pre)
distances = []
for interval in ['RR', 'S1', 'Sys', 'S2', 'Dia']:
templates_i = templates[interval]
intervals = get_intervals(pcg, transitions, interval=interval, resize=ALENGTH[interval] // interDTW_down)
intervals = [resample(i, ALENGTH[interval] // interDTW_down) for i in intervals][:50]
dist_matrix = dtw_distances(intervals, templates_i, n_jobs=-1, constraint=constraint, k=k)
dist_matrix = finite_matrix(dist_matrix)
if sigma:
dist_matrix = -affinity(dist_matrix, sigma)
distances.append(dist_matrix)
RR_mean, S1_mean, Sys_mean, S2_mean, Dia_mean = [np.mean(d) for d in distances]
RR_std, S1_std, Sys_std, S2_std, Dia_std = [np.std(d) for d in distances]
features = [RR_mean, S1_mean, Sys_mean, S2_mean, Dia_mean, RR_std, S1_std, Sys_std, S2_std, Dia_std]
return features
def preprocess_sample(aud_sample,rate):
# Step 0: Pre-process the speech sample
# a. Down-sample to 8 MHz (should be enough for Autism detection - only human speech)
# b. Normalization [Apply gain s.t the sample data is in the range [-1.0, 1.0]
# c. Noise Cancellation
proc_sample = signal.resample(aud_sample, len(aud_sample)*SAMPLING_RATE/rate)
if np.max(proc_sample) > 1.0:
proc_sample = proc_sample*1.0/pow(2, 15)
proc_sample = noise_removal(proc_sample)
return proc_sample
def resample_gnuradio(file: str, ratio: float):
""" ratio is current_fs / desired_fs """
f = load_gnuradio_file(file)
resampled = signal.resample(f, len(f) * ratio)
return resampled
def XsSeg2XaePhon(Xs, Xs_mask, segs, maxLen, nResample=None):
Xae = np.split(Xs, len(Xs))
FRAME_SIZE = Xs.shape[-1]
deletedChars = []
oneLetter = []
Xae_phon = []
for i, utt in enumerate(Xae):
utt = np.squeeze(utt, 0)[np.logical_not(Xs_mask[i])]
utt = np.split(utt, np.where(segs[i, :len(utt)])[0])
if len((utt[0])) == 0:
utt.pop(0)
for j in range(len(utt)):
w_len = min(len(utt[j]), maxLen)
w_target = np.zeros((nResample if nResample else maxLen, FRAME_SIZE))
deletedChars.append(max(0, len(utt[j]) - maxLen))
oneLetter.append(int(w_len == 1))
if nResample:
if w_len > 1:
word = resample(utt[j][:w_len], nResample)
else:
word = np.repeat(utt[j][:w_len], nResample, axis=0)
w_len = maxLen
else:
word = utt[j][:w_len]
w_target[-w_len:] = word
Xae_phon.append(w_target)
Xae_phon = np.stack(Xae_phon)
deletedChars = np.array(deletedChars)
oneLetter = np.array(oneLetter)
return Xae_phon, deletedChars, oneLetter
def f0transform(self, x, completion=False):
"""Transform F0 of given waveform signals using
Parameters
---------
x : array, shape ('len(x)')
array of waveform sequence
completion : bool, optional
Completion of high frequency range of F0 transformed wavform based on
unvoiced analysis/synthesis voice of given voice and high-pass filter.
This is due to loose the high frequency range caused by resampling
when F0ratio setting to smaller than 1.0.
Returns
---------
transformed : array, shape (`len(x)`)
Array of F0 transformed waveform sequence
"""
self.xlen = len(x)
# WSOLA
wsolaed = self.wsola.duration_modification(x)
# resampling
transformed = resample(wsolaed, self.xlen)
# Frequency completion when decrease F0 of wavform
if completion:
if self.f0rate > 1.0:
raise ValueError("Do not enable completion if f0rate > 1.")
transformed = self._high_frequency_completion(x, transformed)
return transformed
def resampleToLengthN(v, n): # can't guarantee length = exactly n
if len(v.shape) == 1:
factor = int(len(v) / float(n))
if factor <= 1:
return v
# return downsampleBy(v, factor)
return signal.resample(v, n)
elif len(v.shape) == 2: # resample each row
rows = map(lambda row: resampleToLengthN(row.flatten(), n), v)
return np.vstack(rows)
else:
print("Error: can't resample tensor to length N!")
def warpedSeq(seq, sameLength=True, useI=True, **kwargs):
path = randWarpingPath(len(seq), **kwargs) # list of (i,j) pairs
idxs_i, idxs_j = zip(*path) # tuple of i vals, tuple of j vals
idxs = idxs_i if useI else idxs_j # use i idxs or j idxs
warped = seq[np.asarray(idxs)]
if sameLength:
warped = signal.resample(warped, len(seq))
return warped
def createAdversarialSignal(startIdxs, endIdxs, signalLength):
assert(len(startIdxs) == len(endIdxs))
numInstances = len(startIdxs)
# avgLength =
pairs = np.arange(numInstances)
if len(pairs) % 2 != 0: # handle odd numbers of instances
pairs = np.append(pairs, np.random.choice(pairs)) # duplicate some idx
np.random.shuffle(pairs)
pairs = pairs.reshape((-1, 2)) # random
out = np.zeros(signalLength)
for pair in pairs:
start1, end1 = startIdxs[pair[0]], endIdxs[pair[0]]
start2, end2 = startIdxs[pair[1]], endIdxs[pair[1]]
length1 = end1 - start1 # end idxs not inclusive
length2 = end2 - start2
subseq = randwalk(length1)
negSeq = -subseq
if length1 != length2:
negSeq = signal.resample(negSeq, length2)
out[start1:end1] = subseq
out[start2:end2] = negSeq
return out
# ================================================================
# Create particular synthetic datasets (for prototyping / smoke testing)
# ================================================================
def main():
args = check_argv()
print "Reading:", args.input_npz_fn
input_npz = np.load(args.input_npz_fn)
d_frame = input_npz[input_npz.keys()[0]].shape[1]
print "Frame dimensionality:", d_frame
if args.frame_dims is not None and args.frame_dims < d_frame:
d_frame = args.frame_dims
print "Reducing frame dimensionality:", d_frame
print "Downsampling:", args.technique
output_npz = {}
for key in input_npz:
# Limit input dimensionailty
y = input_npz[key][:, :args.frame_dims].T
# Downsample
if args.technique == "interpolate":
x = np.arange(y.shape[1])
f = interpolate.interp1d(x, y, kind="linear")
x_new = np.linspace(0, y.shape[1] - 1, args.n)
y_new = f(x_new).flatten(flatten_order) #.flatten("F")
elif args.technique == "resample":
y_new = signal.resample(y, args.n, axis=1).flatten(flatten_order) #.flatten("F")
elif args.technique == "rasanen":
# Taken from Rasenen et al., Interspeech, 2015
n_frames_in_multiple = int(np.floor(y.shape[1] / args.n)) * args.n
y_new = np.mean(
y[:, :n_frames_in_multiple].reshape((d_frame, args.n, -1)), axis=-1
).flatten(flatten_order) #.flatten("F")
# This was done in Rasenen et al., Interspeech, 2015, but didn't help here
# last_term = args.n/3. * np.log10(y.shape[1] * 10e-3) # Not sure if this should be in frames or ms
# y_new = np.hstack([y_new, last_term])
# Save result
output_npz[key] = y_new
print "Output dimensionality:", output_npz[output_npz.keys()[0]].shape[0]
print "Writing:", args.output_npz_fn
np.savez_compressed(args.output_npz_fn, **output_npz)
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def XsSeg2Xae(Xs, Xs_mask, segs, maxUtt, maxLen, nResample=None, check_output=False):
Xae = np.split(Xs, len(Xs))
FRAME_SIZE = Xs.shape[-1]
deletedChars = np.zeros((len(Xae), maxUtt))
oneLetter = np.zeros((len(Xae), maxUtt))
for i,utt in enumerate(Xae):
utt_target = np.zeros((maxUtt, nResample if nResample else maxLen, FRAME_SIZE))
utt = np.squeeze(utt, 0)[np.logical_not(Xs_mask[i])]
utt = np.split(utt, np.where(segs[i,:len(utt)])[0])
if len((utt[0])) == 0:
utt.pop(0)
n_words = min(len(utt), maxUtt)
padwords = maxUtt - n_words
for j in range(n_words):
w_len = min(len(utt[j]), maxLen)
w_target = np.zeros((nResample if nResample else maxLen, FRAME_SIZE))
deletedChars[i,padwords+j] += max(0, len(utt[j]) - maxLen)
oneLetter[i,padwords+j] += int(w_len == 1)
if nResample:
if w_len > 1:
word = resample(utt[j][:w_len], nResample)
else:
word = np.repeat(utt[j][:w_len], nResample, axis=0)
w_len = maxLen
else:
word = utt[j][:w_len]
w_target[-w_len:] = word
utt[j] = w_target
utt_target[padwords+j] = utt[j]
extraWDel = 0
for j in range(maxUtt, len(utt)):
extraWDel += len(utt[j])
## Uniformly distribute clipping penaresh2lty for excess words
deletedChars[i,:] += float(extraWDel) / maxUtt
Xae[i] = utt_target
Xae = np.stack(Xae)
## NOTE: Reconstitution will fail if there has been any clipping.
## Do not use this feature unless maxutt and maxlen are large enough
## to make clipping very unlikely.
## Currently only works in acoustic mode.
if check_output:
for i in range(len(Xs)):
src = Xs[i][np.logical_not(Xs_mask[i])]
target = Xae[i]
reconstituted = np.zeros((0,FRAME_SIZE))
for wi in range(maxUtt):
w = target[wi][np.where(target[wi].any(-1))]
reconstituted = np.concatenate([reconstituted, w])
for j in range(len(src)):
assert np.allclose(src[j], reconstituted[j]), \
'''Reconstitution of MFCC frames failed at timestep %d.
Source region: %s\n Reconstituted region: %s''' \
%(j, src[j-1:j+2], reconstituted[j-1:j+2])
return Xae, deletedChars, oneLetter
def demodulate_array(h, soft_lcd):
# primary worker function, credit to all
i = h[1:] * np.conj(h[:-1])
j = np.angle(i)
k = signal.convolve(j, basebandBP)
# resample from 256kHz to 228kHz
rdsBand = signal.resample(k, int(len(k)*228e3/256e3))
# length modulo 4
rdsBand = rdsBand[:(len(rdsBand)//4)*4]
c57 = numpy.tile( [1., -1.], len(rdsBand)//4 )
xi = rdsBand[::2] * c57
xq = rdsBand[1::2] * (-c57)
xfi = signal.convolve(xi, filtLP)
xfq = signal.convolve(xq, filtLP)
xsfi = signal.convolve(xfi, pulseFilt)
xsfq = signal.convolve(xfq, pulseFilt)
if len(xsfi) % 2 == 1:
xsfi = xsfi[:-1]
xsfq = xsfq[:-1]
xdi = (xsfi[::2] + xsfi[1::2]) / 2
xdq = xsfq[::2]
res = symbol_recovery_24(xdi, xdq)
hits = []
for i in range(len(res)-26):
h = rds_crc(res, i, 26)
if h:
hits.append( (i, h) )
print(res,hits)
packets = []
print([decode_one(res, x[0]) for x in hits if x[1] == 'A'])
for i in range(len(hits)-3):
if hits[i][1] == "A":
bogus = False
for j,sp in enumerate("ABCD"):
if 26*j != hits[i+j][0] - hits[i][0]:
bogus = True
if hits[i+j][1] != sp:
bogus = True
if not bogus:
for j in range(4):
packets.append(decode_one(res, hits[i+j][0]))
soft_lcd.update_state(packets)