def _apply_detrend(da, axis_num):
"""Wrapper function for applying detrending"""
if da.chunks:
func = detrend_wrap(detrendn)
da = xr.DataArray(func(da.data, axes=axis_num),
dims=da.dims, coords=da.coords)
else:
if da.ndim == 1:
da = xr.DataArray(sps.detrend(da),
dims=da.dims, coords=da.coords)
else:
da = detrendn(da, axes=axis_num)
# else:
# raise ValueError("Data should be dask array.")
return da
python类detrend()的实例源码
def _apply_method(x, fMeth, dtrd, method, wltCorr, wltWidth):
npts, ntrial = x.shape
nFce = len(fMeth)
xf = np.zeros((nFce, npts, ntrial))
# Detrend the signal :
if dtrd:
x = detrend(x, axis=0)
# Apply methods :
for k in range(nFce):
xf[k, ...] = fMeth[k](x)
# Correction for the wavelet (due to the wavelet width):
if (method == 'wavelet') and (wltCorr is not None):
w = 3*wltWidth
xf[:, 0:w, :] = xf[:, w+1:2*w+1, :]
xf[:, npts-w:npts, :] = xf[:, npts-2*w-1:npts-w-1, :]
return xf
def test_dft_4d():
"""Test the discrete Fourier transform on 2D data"""
N = 16
da = xr.DataArray(np.random.rand(N,N,N,N),
dims=['time','z','y','x'],
coords={'time':range(N),'z':range(N),
'y':range(N),'x':range(N)}
)
ft = xrft.dft(da, shift=False)
npt.assert_almost_equal(ft.values, np.fft.fftn(da.values))
with pytest.raises(NotImplementedError):
xrft.dft(da, detrend='linear')
with pytest.raises(NotImplementedError):
xrft.dft(da, dim=['time','y','x'], detrend='linear')
da_prime = xrft.detrendn(da[:,0].values, [0,1,2]) # cubic detrend over time, y, and x
npt.assert_almost_equal(xrft.dft(da[:,0].drop('z'),
dim=['time','y','x'],
shift=False, detrend='linear'
).values,
np.fft.fftn(da_prime))
def test_dft_3d_dask():
"""Test the discrete Fourier transform on 3D dask array data"""
N=16
da = xr.DataArray(np.random.rand(N,N,N), dims=['time','x','y'],
coords={'time':range(N),'x':range(N),
'y':range(N)}
)
daft = xrft.dft(da.chunk({'time': 1}), dim=['x','y'], shift=False)
# assert hasattr(daft.data, 'dask')
npt.assert_almost_equal(daft.values,
np.fft.fftn(da.chunk({'time': 1}).values, axes=[1,2])
)
with pytest.raises(ValueError):
xrft.dft(da.chunk({'time': 1, 'x': 1}), dim=['x'])
daft = xrft.dft(da.chunk({'x': 1}), dim=['time'],
shift=False, detrend='linear')
# assert hasattr(daft.data, 'dask')
da_prime = sps.detrend(da.chunk({'x': 1}), axis=0)
npt.assert_almost_equal(daft.values,
np.fft.fftn(da_prime, axes=[0])
)
def test_power_spectrum_dask():
"""Test the power spectrum function on dask data"""
N = 16
dim = ['x','y']
da = xr.DataArray(np.random.rand(2,N,N), dims=['time','x','y'],
coords={'time':range(2),'x':range(N),
'y':range(N)}).chunk({'time': 1}
)
ps = xrft.power_spectrum(da, dim=dim, density=False)
daft = xrft.dft(da, dim=['x','y'])
npt.assert_almost_equal(ps.values, (daft * np.conj(daft)).real.values)
ps = xrft.power_spectrum(da, dim=dim, window=True, detrend='constant')
daft = xrft.dft(da, dim=dim, window=True, detrend='constant')
coord = list(daft.coords)
test = (daft * np.conj(daft)).real/N**4
for i in dim:
test /= daft['freq_' + i + '_spacing']
npt.assert_almost_equal(ps.values, test)
npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
def test_cross_spectrum():
"""Test the cross spectrum function"""
N = 16
da = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
coords={'x':range(N),'y':range(N)}
)
da2 = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
coords={'x':range(N),'y':range(N)}
)
cs = xrft.cross_spectrum(da, da2, window=True, density=False,
detrend='constant')
daft = xrft.dft(da,
dim=None, shift=True, detrend='constant',
window=True)
daft2 = xrft.dft(da2,
dim=None, shift=True, detrend='constant',
window=True)
npt.assert_almost_equal(cs.values, np.real(daft*np.conj(daft2)))
npt.assert_almost_equal(np.ma.masked_invalid(cs).mask.sum(), 0.)
def processData(self, data):
try:
from scipy.signal import detrend
except ImportError:
raise Exception("DetrendFilter node requires the package scipy.signal.")
return detrend(data)
def processData(self, data):
try:
from scipy.signal import detrend
except ImportError:
raise Exception("DetrendFilter node requires the package scipy.signal.")
return detrend(data)
def segment_by_std_dev(series, increment=2, maximum=20):
"""
Divides a series into segments, minimizing standard deviation over window size. Windows are of varying size from
`increment` to `maximum * increment` at each offset `increment` within the series.
:param series:
:param increment:
:param maximum:
:return:
"""
duration = int(series.index[-2])
windows = []
for i in range(0, duration, increment):
for size in range(1, maximum + 1):
window = detrend(series[i:i + size*increment])
heappush(windows, (window.std() / (size*increment), i, i + size*increment))
segments = []
spots = set()
try:
while True:
window_agv_std, start, end = heappop(windows)
if any(i in spots for i in range(start, int(end))):
continue
for i in range(start, int(end)):
spots.add(int(i))
heappush(segments, (start, min(duration, end)))
except IndexError:
pass
return [heappop(segments) for _ in range(len(segments))]
def get_data(filename, channel=None, start=0, length=None, detrend=None):
"""Load selected data from HiDAQ recording.
:param filename: name of the datafile
:param channel: list of channels to read (base 0, None to read all channels)
:param start: sample index to start from
:param length: number of samples to read (None means read all available samples)
:param detrend: processing to be applied to each channel to remove offset/bias
(supported values: ``'linear'``, ``'constant'``, ``None``)
"""
if channel is None:
channel = range(_channels)
elif isinstance(channel, int):
channel = [channel]
if length is None:
length = get_data_length(filename)-start
with open(filename, 'rb') as f:
hdr = _np.fromfile(f, dtype=_np.dtype('>i4'), count=1)
f.seek(hdr[0]+start*_framelen, _os.SEEK_SET)
data = _np.fromfile(f, dtype=_np.dtype('>i2'), count=_channels*length)
data = _np.reshape(data, [length,_channels])
data = _np.take(data, channel, axis=1).astype(_np.float)
if len(channel) == 1:
data = data.ravel()
data = data/2048
if detrend is not None:
data = _sig.detrend(data, axis=0, type=detrend)
return data
def get_data(filename, channel=None, start=0, length=None, detrend='linear'):
"""Load selected data from DTLA recording.
:param filename: name of the datafile
:param channel: list of channels to read (base 0, None to read all channels)
:param start: sample index to start from
:param length: number of samples to read (None means read all available samples)
:param detrend: processing to be applied to each channel to remove offset/bias
(supported values: ``'linear'``, ``'constant'``, ``None``)
"""
if channel is None:
channel = range(_channels)
elif isinstance(channel, int):
channel = [channel]
if length is None:
length = get_data_length(filename)-start
with open(filename, 'rb') as f:
f.seek(start*_framelen, _os.SEEK_SET)
data = _np.fromfile(f, dtype=_np.uint16, count=_framelen/2*length)
data = _np.reshape(data, [length,_framelen/2])
data = data[:,2:]
data = _np.take(data, channel, axis=1).astype(_np.float)
if len(channel) == 1:
data = data.ravel()
data = 5*data/65536-2.5
if detrend is not None:
data = _sig.detrend(data, axis=0, type=detrend)
return data
def bandpassfilter(x,fs):
"""
:param x: a list of samples
:param fs: sampling frequency
:return: filtered list
"""
x = signal.detrend(x)
b = signal.firls(129,[0,0.6*2/fs,0.7*2/fs,3*2/fs,3.5*2/fs,1],[0,0,1,1,0,0],[100*0.02,0.02,0.02])
return signal.convolve(x,b,'valid')
def detrend_wrap(detrend_func):
"""
Wrapper function for `xrft.detrendn`.
"""
def func(a, axes=None):
if a.ndim > 4 or len(axes) > 3:
raise ValueError("Data has too many dimensions "
"and/or too many axes to detrend over.")
if axes is None:
axes = tuple(range(a.ndim))
else:
if len(set(axes)) < len(axes):
raise ValueError("Duplicate axes are not allowed.")
for each_axis in axes:
if len(a.chunks[each_axis]) != 1:
raise ValueError('The axis along the detrending is upon '
'cannot be chunked.')
if len(axes) == 1:
return dsar.map_blocks(sps.detrend, a, axis=axes[0],
chunks=a.chunks, dtype=a.dtype
)
else:
for each_axis in range(a.ndim):
if each_axis not in axes:
if len(a.chunks[each_axis]) != a.shape[each_axis]:
raise ValueError("The axes other than ones to detrend "
"over should have a chunk length of 1.")
return dsar.map_blocks(detrend_func, a, axes,
chunks=a.chunks, dtype=a.dtype
)
return func
def test_dft_1d(test_data_1d):
"""Test the discrete Fourier transform function on one-dimensional data."""
da = test_data_1d
Nx = len(da)
dx = float(da.x[1] - da.x[0]) if 'x' in da else 1
# defaults with no keyword args
ft = xrft.dft(da, detrend='constant')
# check that the frequency dimension was created properly
assert ft.dims == ('freq_x',)
# check that the coords are correct
freq_x_expected = np.fft.fftshift(np.fft.fftfreq(Nx, dx))
npt.assert_allclose(ft['freq_x'], freq_x_expected)
# check that a spacing variable was created
assert ft['freq_x_spacing'] == freq_x_expected[1] - freq_x_expected[0]
# make sure the function is lazy
assert isinstance(ft.data, type(da.data))
# check that the Fourier transform itself is correct
data = (da - da.mean()).values
ft_data_expected = np.fft.fftshift(np.fft.fft(data))
# because the zero frequency component is zero, there is a numerical
# precision issue. Fixed by setting atol
npt.assert_allclose(ft_data_expected, ft.values, atol=1e-14)
# redo without removing mean
ft = xrft.dft(da)
ft_data_expected = np.fft.fftshift(np.fft.fft(da))
npt.assert_allclose(ft_data_expected, ft.values)
# redo with detrending linear least-square fit
ft = xrft.dft(da, detrend='linear')
da_prime = sps.detrend(da.values)
ft_data_expected = np.fft.fftshift(np.fft.fft(da_prime))
npt.assert_allclose(ft_data_expected, ft.values, atol=1e-14)
if 'x' in da and not da.chunks:
da['x'].values[-1] *= 2
with pytest.raises(ValueError):
ft = xrft.dft(da)
def test_isotropic_ps_slope(N=512, dL=1., amp=1e1, s=-3.):
"""Test the spectral slope of isotropic power spectrum."""
theta = xr.DataArray(_synthetic_field(N, dL, amp, s),
dims=['y', 'x'],
coords={'y':range(N), 'x':range(N)})
iso_ps = xrft.isotropic_powerspectrum(theta, detrend='constant',
density=True)
npt.assert_almost_equal(np.ma.masked_invalid(iso_ps[1:]).mask.sum(), 0.)
y_fit, a, b = xrft.fit_loglog(iso_ps.freq_r.values[4:],
iso_ps.values[4:])
npt.assert_allclose(a, s, atol=.1)
def detrend(x, order=1, axis=-1):
"""Detrend the array x.
Parameters
----------
x : n-d array
Signal to detrend.
order : int
Fit order. Currently must be '0' or '1'.
axis : integer
Axis of the array to operate on.
Returns
-------
xf : array
x detrended.
Examples
--------
As in scipy.signal.detrend:
>>> randgen = np.random.RandomState(9)
>>> npoints = int(1e3)
>>> noise = randgen.randn(npoints)
>>> x = 3 + 2*np.linspace(0, 1, npoints) + noise
>>> (detrend(x) - noise).max() < 0.01
True
"""
from scipy.signal import detrend
if axis > len(x.shape):
raise ValueError('x does not have %d axes' % axis)
if order == 0:
fit = 'constant'
elif order == 1:
fit = 'linear'
else:
raise ValueError('order must be 0 or 1')
y = detrend(x, axis=axis, type=fit)
return y
def plot(self, scale_factor=1.0, plot_ax='x'):
time = np.arange(self.delay, (self.n_samples*self.dt + self.delay), self.dt)
# Normalize and detrend
norm_traces = np.zeros( np.shape(self.timeHistories) )
for k in range(self.n_channels):
current_trace = self.timeHistories[:,k]
current_trace = signal.detrend(current_trace) # Linear detrend
current_trace = current_trace / np.amax(current_trace) # Normalize by max value
current_trace = current_trace*scale_factor + self.position[k] # Scale and shift
norm_traces[:,k]= current_trace
# Plotting
if str.lower(plot_ax) == 'y':
fig = plt.figure( figsize=(2.75,6) )
ax = fig.add_axes([0.14,0.20,0.8,0.8])
for m in range(self.n_channels):
ax.plot(time, norm_traces[:,m],'b-', linewidth=0.5)
ax.set_xlim( ( min(time), max(time) ) )
ax.set_ylim( (-self.position[1], self.position[1]+self.position[len(self.position)-1] ) )
ax.set_xticklabels(ax.get_xticks(), fontsize=11, fontname='arial' )
ax.set_yticklabels(ax.get_yticks(), fontsize=11, fontname='arial' )
ax.grid(axis='x', linestyle='--')
ax.set_xlabel('Time (s)', fontsize=11, fontname="arial")
ax.set_ylabel('Normalized Amplitude', fontsize=11, fontname="arial")
ax.tick_params(labelsize=11)
ax.tick_params('x', length=4, width=1, which='major')
ax.tick_params('y', length=4, width=1, which='major')
elif str.lower(plot_ax) == 'x':
fig = plt.figure( figsize=(6,2.75) )
ax = fig.add_axes([0.14,0.20,0.8,0.75])
for m in range(self.n_channels):
ax.plot(norm_traces[:,m], time, 'b-', linewidth=0.5)
ax.set_ylim( ( max(time), min(time) ) )
ax.set_xlim( (-self.position[1], self.position[1]+self.position[len(self.position)-1] ) )
ax.set_yticklabels(ax.get_yticks(), fontsize=11, fontname='arial' )
ax.set_xticklabels(ax.get_xticks(), fontsize=11, fontname='arial' )
ax.grid(axis='y', linestyle='--')
ax.set_ylabel('Time (s)', fontsize=11, fontname="arial")
ax.set_xlabel('Normalized Amplitude', fontsize=11, fontname="arial")
ax.tick_params(labelsize=11)
ax.tick_params('y', length=4, width=1, which='major')
ax.tick_params('x', length=4, width=1, which='major')
return
# Method to pad zeros to achieve desired frequency sampling
def maxdelwind(source, target, rate=1, start_time=0., delta_time=1.,
sample_duration=.5):
'''
delay, times = maxdelwid(...)
Calculates a time-varying delay function from source (x)
to target (y) at intervals delta_time.
Parameters:
* source: source signal (reuqired)
* target: target signal (required)
* rate: sampling rate
* start_time: starting time for tfe calculations
* delta_time: distance between calculations
* sample_duration: length of signals used in tfe estimates
(longer than window_duration, used in averaging)
Returns:
* delay: max delay array
* times: times corresponding to delay estimates (array size M)
'''
# convert time to samples
sample_start = int(start_time*rate)
sample_delta = int(delta_time*rate)
sample_len = int(sample_duration*rate)
window = np.ones(sample_len)
n_samples = min(len(source), len(target))
sample_end = n_samples - sample_start - sample_len
delay = []
corr_strength = []
times = []
for block_start in np.arange(sample_start, sample_end, sample_delta):
block_end = block_start + sample_len
target_block = sig.detrend(target[block_start:block_end])
source_block = sig.detrend(source[block_start:block_end])
block_del, block_corr = block_delay(target_block, source_block,
window=window)
times.append((block_start+sample_len/2)/float(rate))
delay.append(block_del/float(rate))
corr_strength.append(block_corr)
return np.array(delay), np.array(corr_strength), np.array(times)
def power_spectrum(da, spacing_tol=1e-3, dim=None, shift=True, detrend=None, density=True,
window=False):
"""
Calculates the power spectrum of da.
.. math::
da' = da - \overline{da}
ps = \mathbb{F}(da') * {\mathbb{F}(da')}^*
Parameters
----------
da : `xarray.DataArray`
The data to be transformed
spacing_tol: float (default)
Spacing tolerance. Fourier transform should not be applied to uneven grid but
this restriction can be relaxed with this setting. Use caution.
dim : list (optional)
The dimensions along which to take the transformation. If `None`, all
dimensions will be transformed.
shift : bool (optional)
Whether to shift the fft output.
detrend : str (optional)
If `constant`, the mean across the transform dimensions will be
subtracted before calculating the Fourier transform (FT).
If `linear`, the linear least-square fit will be subtracted before
the FT.
density : list (optional)
If true, it will normalize the spectrum to spectral density
window : bool (optional)
Whether to apply a Hann window to the data before the Fourier
transform is taken
Returns
-------
ps : `xarray.DataArray`
Two-dimensional power spectrum
"""
if dim is None:
dim = da.dims
# the axes along which to take ffts
axis_num = [da.get_axis_num(d) for d in dim]
N = [da.shape[n] for n in axis_num]
daft = dft(da, spacing_tol,
dim=dim, shift=shift, detrend=detrend,
window=window)
coord = list(daft.coords)
ps = (daft * np.conj(daft)).real
if density:
ps /= (np.asarray(N).prod())**2
for i in dim:
ps /= daft['freq_' + i + '_spacing']
return ps
def test_detrend():
N = 16
x = np.arange(N+1)
y = np.arange(N-1)
t = np.linspace(-int(N/2), int(N/2), N-6)
z = np.arange(int(N/2))
d4d = (t[:,np.newaxis,np.newaxis,np.newaxis]
+ z[np.newaxis,:,np.newaxis,np.newaxis]
+ y[np.newaxis,np.newaxis,:,np.newaxis]
+ x[np.newaxis,np.newaxis,np.newaxis,:]
)
da4d = xr.DataArray(d4d, dims=['time','z','y','x'],
coords={'time':range(len(t)),'z':range(len(z)),'y':range(len(y)),
'x':range(len(x))}
)
func = xrft.detrend_wrap(xrft.detrendn)
#########
# Chunk along the `time` axis
#########
da = da4d.chunk({'time': 1})
with pytest.raises(ValueError):
func(da.data, axes=[0]).compute
with pytest.raises(ValueError):
func(da.data, axes=[0,1,2,3]).compute()
da_prime = func(da.data, axes=[2]).compute()
npt.assert_allclose(da_prime[0,0], sps.detrend(d4d[0,0], axis=0))
da_prime = func(da.data, axes=[1,2,3]).compute()
npt.assert_allclose(da_prime[0],
xrft.detrendn(d4d[0], axes=[0,1,2]))
#########
# Chunk along the `time` and `z` axes
#########
da = da4d.chunk({'time':1, 'z':1})
with pytest.raises(ValueError):
func(da.data, axes=[1,2]).compute()
with pytest.raises(ValueError):
func(da.data, axes=[2,2]).compute()
da_prime = func(da.data, axes=[2,3]).compute()
npt.assert_allclose(da_prime[0,0],
xrft.detrendn(d4d[0,0], axes=[0,1]))
s = np.arange(2)
d5d = d4d[np.newaxis,:,:,:,:] + s[:,np.newaxis,np.newaxis,
np.newaxis,np.newaxis]
da5d = xr.DataArray(d5d, dims=['s','time','z','y','x'],
coords={'s':range(len(s)),'time':range(len(t)),
'z':range(len(z)),'y':range(len(y)),
'x':range(len(x))}
)
da = da5d.chunk({'time':1})
with pytest.raises(ValueError):
func(da.data).compute()
def test_power_spectrum():
"""Test the power spectrum function"""
N = 16
da = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
coords={'x':range(N),'y':range(N)}
)
ps = xrft.power_spectrum(da, window=True, density=False,
detrend='constant')
daft = xrft.dft(da,
dim=None, shift=True, detrend='constant',
window=True)
npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft)))
npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
### Normalized
dim = da.dims
ps = xrft.power_spectrum(da, window=True, detrend='constant')
daft = xrft.dft(da, window=True, detrend='constant')
coord = list(daft.coords)
test = np.real(daft*np.conj(daft))/N**4
for i in range(len(dim)):
test /= daft[coord[-i-1]].values
npt.assert_almost_equal(ps.values, test)
npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
### Remove mean
da = xr.DataArray(np.random.rand(5,20,30),
dims=['time', 'y', 'x'],
coords={'time': np.arange(5),
'y': np.arange(20), 'x': np.arange(30)})
ps = xrft.power_spectrum(da, dim=['y', 'x'],
window=True, density=False, detrend='constant'
)
daft = xrft.dft(da, dim=['y','x'], window=True, detrend='constant')
npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft)))
npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
### Remove least-square fit
da_prime = np.zeros_like(da.values)
for t in range(5):
da_prime[t] = numpy_detrend(da[t].values)
da_prime = xr.DataArray(da_prime, dims=da.dims, coords=da.coords)
ps = xrft.power_spectrum(da_prime, dim=['y', 'x'],
window=True, density=False, detrend='constant'
)
daft = xrft.dft(da_prime, dim=['y','x'], window=True, detrend='constant')
npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft)))
npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
def test_parseval():
"""Test whether the Parseval's relation is satisfied."""
N = 16
da = xr.DataArray(np.random.rand(N,N),
dims=['x','y'], coords={'x':range(N), 'y':range(N)})
da2 = xr.DataArray(np.random.rand(N,N),
dims=['x','y'], coords={'x':range(N), 'y':range(N)})
dim = da.dims
delta_x = []
for d in dim:
coord = da[d]
diff = np.diff(coord)
delta = diff[0]
delta_x.append(delta)
window = np.hanning(N) * np.hanning(N)[:, np.newaxis]
ps = xrft.power_spectrum(da, window=True, detrend='constant')
da_prime = da.values - da.mean(dim=dim).values
npt.assert_almost_equal(ps.values.sum(),
(np.asarray(delta_x).prod()
* ((da_prime*window)**2).sum()
), decimal=5
)
cs = xrft.cross_spectrum(da, da2, window=True, detrend='constant')
da2_prime = da2.values - da2.mean(dim=dim).values
npt.assert_almost_equal(cs.values.sum(),
(np.asarray(delta_x).prod()
* ((da_prime*window)
* (da2_prime*window)).sum()
), decimal=5
)
d3d = xr.DataArray(np.random.rand(N,N,N),
dims=['time','y','x'],
coords={'time':range(N), 'y':range(N), 'x':range(N)}
).chunk({'time':1})
ps = xrft.power_spectrum(d3d, dim=['x','y'], window=True, detrend='linear')
npt.assert_almost_equal(ps[0].values.sum(),
(np.asarray(delta_x).prod()
* ((numpy_detrend(d3d[0].values)*window)**2).sum()
), decimal=5
)
def process_traces(self, s, h):
""" Performs data processing operations on traces
"""
# remove linear trend
#s = _signal.detrend(s)
# mute (added by DmBorisov)
if PAR.MUTE:
vel = PAR.MUTESLOPE
off = PAR.T0_TOP
inn = PAR.MUTEINNER
mute_out = PAR.MUTEOUTER
s = smute(s, h, vel, off, inn, mute_out, constant_spacing=False)
vel = PAR.MUTESLOPE_BTM
off = PAR.T0_BOT
s = smutelow(s, h, vel, off, inn, mute_out, constant_spacing=False)
# filter data (modified by DmBorisov)
if PAR.BANDPASS:
s = sbandpass(s, h, PAR.FREQLO, PAR.FREQHI)
# scale all traces by a single value (norm) (added by DmBorisov)
if PAR.NORMALIZE_ALL:
sum_norm = np.linalg.norm(s, ord=2)
if sum_norm > 0:
s /= sum_norm
# mute (added by DmBorisov)
if PAR.MUTE:
vel = PAR.MUTESLOPE
off = PAR.T0_TOP
inn = PAR.MUTEINNER
mute_out = PAR.MUTEOUTER
s = smute(s, h, vel, off, inn, mute_out, constant_spacing=False)
vel = PAR.MUTESLOPE_BTM
off = PAR.T0_BOT
s = smutelow(s, h, vel, off, inn, mute_out, constant_spacing=False)
return s