def _smooth_acf_savgol(acf, windowsize=21, polyorder=2):
'''
This returns a smoothed version of the ACF.
This version uses the Savitsky-Golay smoothing filter
'''
smoothed = savgol_filter(acf, windowsize, polyorder)
return smoothed
python类savgol_filter()的实例源码
def VIsmooth_ref(x):
#the size of EVIgood is 92*5760000, the size of the reference data is 46*5760000
x[x == -9999] = np.nan
EVIgood = x[0:92]
reference = np.concatenate([x[115:], x[92:], x[92:115]])
if np.sum(np.isnan(EVIgood)) == 92:
return np.concatenate([x[92:], x[23:69], x[92:]])
############################
#here require complicated algorithm
#first get the difference between these two
diff = EVIgood - reference
#fun = cdll.LoadLibrary(os.getcwd() + '/bise.so')
#outdiff = (c_double * len(EVIgood))()
#nans, y = nan_helper(diff)
#diff[nans] = np.interp(y(nans), y(~nans), diff[~nans])
diff[reference == 0] = 0
diff = pd.Series(diff)
reconstructVI = reference+diff.interpolate()
SGVI = savgol_filter(np.array(reconstructVI[23:69]), window_length=5, polyorder=3)
SGVI[SGVI < 0] = 0
return np.concatenate([SGVI, x[23:69], x[92:]])
def dump_cycle(f, user, ps, symptom, cl):
""" Takes predicted values, dumps appropriate results accordingly.
:param f: output file
:param user: user id
:param ps: predictions for this user
:param symptom: symptom for which these predictions are for
:param cl: expected cycle length
"""
y = ps
y[np.isnan(y)] = 0
x = np.array(list(range(len(ps))))
xx = np.linspace(x.min(), x.max(), cl)
itp = interp1d(x, y, kind='linear')
window_size, poly_order = 5, 3
yy_sg = savgol_filter(itp(xx), window_size, poly_order)
for i in range(int(cl)):
lp = np.max([.001, np.min([yy_sg[int(i)], .99])])
f.write("%s,%d,%s,%g\n" % (user, i, symptom, lp))
def savitzky_golay(series, *args, **kwargs):
return pandas.Series(index=series.index, data=savgol_filter(series.values, *args, **kwargs))
def savgol_filter(array, win=2001, polyn=3, iteration=1, threshold=1):
"""Apply scipy.signal.savgol_filter iteratively.
Args:
array (decode.array): Decode array which will be filtered.
win (int): Length of window.
polyn (int): Order of polynominal function.
iteration (int): The number of iterations.
threshold (float): Threshold above which the data will be used as signals
in units of sigma.
Returns:
fitted (decode.array): Fitted results.
"""
logger = getLogger('decode.models.savgol_filter')
logger.warning('Do not use this function. We recommend you to use dc.models.pca instead.')
logger.info('win polyn iteration threshold')
logger.info('{} {} {} {}'.format(win, polyn, iteration, threshold))
### 1st estimation
array = array.copy()
fitted = signal.savgol_filter(array, win, polyn, axis=0)
filtered = array - fitted
### nth iteration
for i in range(iteration):
sigma = filtered.std(axis=0)
mask = (filtered >= threshold * sigma)
masked = np.ma.array(filtered, mask=mask)
### mask?????????threshold * sigma????????
filled = masked.filled(threshold * sigma)
### fitted data???????????????savgol_filter????
clipped = filled + fitted
fitted = signal.savgol_filter(clipped, win, polyn, axis=0)
filtered = array - fitted
return fitted
def dataframeapply(df):
df = pd.DataFrame(np.concatenate([df[23:46, :], df, df[:23, :]]))
df_smoothed = df.apply(VIsmooth)
df_smoothed = df_smoothed.interpolate(axis=0)
#make a SG filter
df_select = df_smoothed.as_matrix()[23:69, :]
df_select[np.isnan(df_select)] = 0
bisendviSG = savgol_filter(df_select, window_length=5, polyorder=3)
#bisendvi = None
bisendviSG[bisendviSG < 0] = 0
return bisendviSG
def sep_sg(df):
#df = df.as_matrix()
df[np.isnan(df)] = 0
df = savgol_filter(df, axis=0, window_length=5, polyorder=3)
return df
def savitzkyGolay(s, *args, **kwargs):
return spsig.savgol_filter(s, *args, axis=0, **kwargs)
def fcn(self, data_in):
baseline = _sg(data_in, window_length=self.win_size, polyorder=self.order, axis=-1)
data_out = data_in - baseline
return data_out
def fcn(self, data_in):
"""
If return list, [0] goes to original, [1] goes to affected
"""
baseline = _sg(data_in, window_length=self.parameters['window_length'],
polyorder=self.parameters['polyorder'], axis=-1)
data_out = data_in - baseline
return [baseline, data_out]
def errorCorrectHSData(self, hsdatacls):
temp_spectra = hsdatacls._get_rand_spectra_real(10, pt_sz=3, quads=True)
plugin = widgetSG()
result = DialogPlotEffect.dialogPlotEffect(temp_spectra, x=hsdatacls.freqvec,
plugin=plugin, xlabel='Wavenumber (cm$^{-1}$)',
ylabel='Real {$\chi_R$} (au)', show_difference=True)
if result is not None:
win_size = result.win_size
order = result.order
detrend_ct = 0
detrend_tot = hsdatacls.mlen * hsdatacls.nlen
# Most efficient system
if len(hsdatacls.pixrange) != 0:
data_out = hsdatacls.spectrafull[:,:, hsdatacls.pixrange[0]:hsdatacls.pixrange[1]+1]
else:
data_out = hsdatacls.spectrafull
start = _ti.default_timer()
correction = (1/_sg(data_out.real, window_length=win_size, polyorder=order, axis=-1))
data_out = data_out*correction
stop = _ti.default_timer()
# print('2: {}'.format(stop2-start2))
# print('3: {}'.format(stop3-start3))
# print('4: {}'.format(stop4-start4))
# print('5: {}'.format(stop5-start5))
#
print('Scaled {} spectra ({:.5f} sec/spect)'.format(detrend_tot, (stop-start)/(hsdatacls.mlen*hsdatacls.nlen)))
hsdatacls.spectra = data_out
return ['Scaling','Type', 'SG', 'win_size', win_size, 'order', order]
else:
return None
def deNoiseNRB(self):
"""
Denoise NRB with Savitky-Golay
"""
# Range of pixels to perform-over
rng = self.hsi.freq.op_range_pix
plugin = _widgetSG(window_length=11, polyorder=3)
winPlotEffect = _DialogPlotEffect.dialogPlotEffect(self.nrb.mean()[rng],
x=self.hsi.f,
plugin=plugin,
parent=self)
if winPlotEffect is not None:
win_size = winPlotEffect.parameters['window_length']
order = winPlotEffect.parameters['polyorder']
nrb_denoise = _copy.deepcopy(_np.squeeze(self.nrb.data))
nrb_denoise[..., rng] = _sg(nrb_denoise[..., rng], win_size, order)
self.nrb.data = nrb_denoise
# Backup for Undo
self.bcpre.add_step(['DenoiseNrbSG',
'Win_size', win_size,
'Order', order])
# if self.ui.actionUndo_Backup_Enabled.isChecked():
# try:
# _BCPre.backup_pickle(self.hsi, self.bcpre.id_list[-1])
# except:
# print('Error in pickle backup (Undo functionality)')
# else:
# self.bcpre.backed_up()
self.changeSlider()
def deNoiseDark(self):
"""
Denoise Dark with Savitky-Golay
"""
# Range of pixels to perform-over
rng = self.hsi.freq.op_range_pix
plugin = _widgetSG(window_length=201, polyorder=3)
winPlotEffect = _DialogPlotEffect.dialogPlotEffect(self.dark.mean()[rng],
x=self.hsi.f,
plugin=plugin,
parent=self)
if winPlotEffect is not None:
win_size = winPlotEffect.parameters['window_length']
order = winPlotEffect.parameters['polyorder']
dark_denoise = _copy.deepcopy(_np.squeeze(self.dark.data))
dark_denoise[..., rng] = _sg(dark_denoise[..., rng], win_size, order)
self.dark.data = dark_denoise
# Backup for Undo
self.bcpre.add_step(['DenoiseDarkSG',
'Win_size', win_size,
'Order', order])
# if self.ui.actionUndo_Backup_Enabled.isChecked():
# try:
# _BCPre.backup_pickle(self.hsi, self.bcpre.id_list[-1])
# except:
# print('Error in pickle backup (Undo functionality)')
# else:
# self.bcpre.backed_up()
self.changeSlider()
def find_resonance(frec,S11,margin=31,doplots=False):
"""
Returns the resonance frequency from maximum of the derivative of the phase
"""
#Smooth data for initial guesses
sReS11 = np.array(smooth(S11.real,margin,3))
sImS11 = np.array(smooth(S11.imag,margin,3))
sS11 = np.array( [x+1j*y for x,y in zip(sReS11,sImS11) ] )
#Make smoothed phase vector removing 2pi jumps
sArgS11 = np.angle(sS11)
sArgS11 = np.unwrap(sArgS11)
sdiffang = np.diff(sArgS11)
#Get resonance index from maximum of the derivative of the phase
avgph = np.average(sdiffang)
errvec = [np.power(x-avgph,2.) for x in sdiffang]
ires = np.argmax(errvec[margin:-margin])+margin
f0i=frec[ires]
print("Max index: ",ires," Max frequency: ",f0i)
if doplots:
plt.title('Original signal (Re,Im)')
plt.plot(frec,S11.real)
plt.plot(frec,S11.imag)
plt.show()
plt.plot(np.angle(sS11))
plt.title('Smoothed Phase')
plt.axis('auto')
plt.show()
plt.plot(sdiffang[margin:-margin])
plt.plot(sdiffang)
plt.title('Diff of Smoothed Phase')
plt.show()
plt.title('Smoothed (ph-phavg)\^2')
plt.plot(errvec)
plt.plot([ires],[errvec[ires]],'ro')
plt.show()
return f0i
def diff_find_resonance(frec,diffS11,margin=31,doplots=False):
"""
Returns the resonance frequency from maximum of the derivative of the phase
"""
#Smooth data for initial guesses
sReS11 = np.array(smooth(diffS11.real,margin,3))
sImS11 = np.array(smooth(diffS11.imag,margin,3))
sS11 = np.array( [x+1j*y for x,y in zip(sReS11,sImS11) ] )
#Make smoothed phase vector removing 2pi jumps
sArgS11 = np.angle(sS11)
sArgS11 = np.unwrap(sArgS11)
sdiffang = np.diff(sArgS11)
#Get resonance index from maximum of the derivative of the phase
avgph = np.average(sdiffang)
errvec = [np.power(x-avgph,2.) for x in sdiffang]
ires = np.argmax(errvec[margin:-margin])+margin
f0i=frec[ires]
print("Max index: ",ires," Max frequency: ",f0i)
if doplots:
plt.title('Original signal (Re,Im)')
plt.plot(frec,diffS11.real)
plt.plot(frec,diffS11.imag)
plt.plot(frec,np.abs(diffS11))
plt.show()
plt.plot(np.angle(sS11))
plt.title('Smoothed Phase')
plt.axis('auto')
plt.show()
plt.plot(sdiffang[margin:-margin])
plt.title('Diff of Smoothed Phase')
plt.show()
plt.title('Smoothed (ph-phavg)\^2')
plt.plot(errvec)
plt.plot([ires],[errvec[ires]],'ro')
plt.show()
return f0i
def valid_curvesmooth(valid_list, number):
if len(valid_list) < number:
valid_list_smooth = valid_list
else:
valid_list_smooth = savgol_filter(valid_list, number, 3) # window size number, polynomial order 3
return valid_list_smooth
def valid_curvesmooth(valid_list, number):
if len(valid_list) < number:
valid_list_smooth = valid_list
else:
valid_list_smooth = savgol_filter(valid_list, number, 3) # window size number, polynomial order 3
return valid_list_smooth
def _nan_extend_edges_and_interpolate(xs, X):
"""
Handle NaNs at the edges are handled as with savgol_filter mode nearest:
the edge values are interpolated. NaNs in the middle are interpolated
so that they do not propagate.
"""
nans = None
if np.any(np.isnan(X)):
nans = np.isnan(X)
X = X.copy()
_fill_edges(X)
X = interp1d_with_unknowns_numpy(xs, X, xs)
return X, nans
def __call__(self, data):
if data.domain != self.domain:
data = data.from_table(self.domain, data)
xs, xsind, mon, X = _transform_to_sorted_features(data)
X, nans = _nan_extend_edges_and_interpolate(xs[xsind], X)
X = savgol_filter(X, window_length=self.window,
polyorder=self.polyorder,
deriv=self.deriv, mode="nearest")
# set NaNs where there were NaNs in the original array
if nans is not None:
X[nans] = np.nan
return _transform_back_to_features(xsind, mon, X)
def perform_map_analysis(self):
# Smoothing (presently performed for individual detectors)
# FIX: MAY NEED A SUM MAP THAT ALIGNS DETECTOR CHANNELS AND SUMS
sigma_spec = self.settings['spectral_smooth_gauss_sigma'] # In terms of frames?
width_spec = self.settings['spectral_smooth_savgol_width'] # In terms of frames?
order_spec = self.settings['spectral_smooth_savgol_order']
if self.settings['spectral_smooth_type'] == 'Gaussian':
print('spectral smoothing...')
self.current_auger_map = gaussian_filter(self.current_auger_map, (sigma_spec,0,0,0,0))
elif self.settings['spectral_smooth_type'] == 'Savitzky-Golay':
# Currently always uses 4th order polynomial to fit
print('spectral smoothing...')
self.current_auger_map = savgol_filter(self.current_auger_map, 1 + 2*width_spec, order_spec, axis=0)
# Background subtraction (implemented detector-wise currently)
# NOTE: INSUFFICIENT SPATIAL SMOOTHING MAY GIVE INACCURATE OR EVEN INF RESULTS
if not(self.settings['subtract_ke1'] == 'None'):
print('Performing background subtraction...')
for iDet in range(self.current_auger_map.shape[-1]):
# Fit a power law to the background
# get background range
ke_min = self.settings['ke1_start']
ke_max = self.settings['ke1_stop']
fit_map = (self.ke[iDet] > ke_min) * (self.ke[iDet] < ke_max)
ke_to_fit = self.ke[iDet,fit_map]
spec_to_fit = self.current_auger_map[fit_map,0,:,:,iDet].transpose(1,2,0)
if self.settings['subtract_ke1'] == 'Power Law':
# Fit power law
A, m = self.fit_powerlaw(ke_to_fit, spec_to_fit)
ke_mat = np.tile(self.ke[iDet], (spec_to_fit.shape[0],spec_to_fit.shape[1],1)).transpose(2,0,1)
A = np.tile(A, (self.ke.shape[1], 1, 1))
m = np.tile(m, (self.ke.shape[1], 1, 1))
bg = A * ke_mat**m
elif self.settings['subtract_ke1'] == 'Linear':
# Fit line
m, b = self.fit_line(ke_to_fit, spec_to_fit)
ke_mat = np.tile(self.ke[iDet], (spec_to_fit.shape[0],spec_to_fit.shape[1],1)).transpose(2,0,1)
m = np.tile(m, (self.ke.shape[1], 1, 1))
b = np.tile(b, (self.ke.shape[1], 1, 1))
bg = m * ke_mat + b
self.current_auger_map[:,0,:,:,iDet] -= bg
if self.settings['subtract_tougaard']:
R_loss = self.settings['R_loss']
E_loss = self.settings['E_loss']
dE = self.ke[0,1] - self.ke[0,0]
# Always use a kernel out to 3 * E_loss to ensure enough feature size
ke_kernel = np.arange(0, 3*E_loss, abs(dE))
if not np.mod(len(ke_kernel),2) == 0:
ke_kernel = np.arange(0, 3*E_loss+dE, abs(dE))
self.K_toug = (8.0/np.pi**2)*R_loss*E_loss**2 * ke_kernel / ((2.0*E_loss/np.pi)**2 + ke_kernel**2)**2
# Normalize the kernel so the its area is equal to R_loss
self.K_toug /= (np.sum(self.K_toug) * dE)/R_loss
self.current_auger_map -= dE * correlate1d(self.current_auger_map, self.K_toug,
mode='nearest', origin=-len(ke_kernel)//2, axis=0)
def perform_spectral_analysis(self):
# FIX: Consolidate with map analysis
# Performs same analysis functions as the map, but just on the single (1D) total spectrum
# Smoothing (presently performed for individual detectors)
sigma_spec = self.settings['spectral_smooth_gauss_sigma'] # In terms of frames?
width_spec = self.settings['spectral_smooth_savgol_width'] # In terms of frames?
order_spec = self.settings['spectral_smooth_savgol_order']
if self.settings['spectral_smooth_type'] == 'Gaussian':
print('spectral smoothing...')
self.total_spec = gaussian_filter(self.total_spec, sigma_spec)
elif self.settings['spectral_smooth_type'] == 'Savitzky-Golay':
# Currently always uses 4th order polynomial to fit
print('spectral smoothing...')
self.total_spec = savgol_filter(self.total_spec, 1 + 2*width_spec, order_spec)
# Background subtraction (implemented detector-wise currently)
# NOTE: INSUFFICIENT SPATIAL SMOOTHING MAY GIVE INACCURATE OR EVEN INF RESULTS
if not(self.settings['subtract_ke1'] == 'None'):
print('Performing background subtraction...')
# Fit a power law to the background
# get background range
ke_min = self.settings['ke1_start']
ke_max = self.settings['ke1_stop']
fit_map = (self.ke_interp > ke_min) * (self.ke_interp < ke_max)
ke_to_fit = self.ke_interp[fit_map]
spec_to_fit = self.total_spec[fit_map]
if self.settings['subtract_ke1'] == 'Power Law':
# Fit power law
A, m = self.fit_powerlaw(ke_to_fit, spec_to_fit)
bg = A * self.ke_interp**m
elif self.settings['subtract_ke1'] == 'Linear':
# Fit line (there may be an easier way for 1D case)
m, b = self.fit_line(ke_to_fit, spec_to_fit)
bg = m * self.ke_interp + b
self.total_spec -= bg
if self.settings['subtract_tougaard']:
R_loss = self.settings['R_loss']
E_loss = self.settings['E_loss']
dE = self.ke_interp[1] - self.ke_interp[0]
# Always use a kernel out to 3 * E_loss to ensure enough feature size
ke_kernel = np.arange(0, 3*E_loss, abs(dE))
if not np.mod(len(ke_kernel),2) == 0:
ke_kernel = np.arange(0, 3*E_loss+dE, abs(dE))
self.K_toug = (8.0/np.pi**2)*R_loss*E_loss**2 * ke_kernel / ((2.0*E_loss/np.pi)**2 + ke_kernel**2)**2
# Normalize the kernel so the its area is equal to R_loss
self.K_toug /= (np.sum(self.K_toug) * dE)/R_loss
self.total_spec -= dE * correlate1d(self.total_spec, self.K_toug,
mode='nearest', origin=-len(ke_kernel)//2, axis=0)