def multinomial_entropy(probs, count):
"""Compute entropy of multinomial distribution with given probs and count.
Args:
probs: A 1-dimensional array of normalized probabilities.
count: The number of draws in a multinomial distribution.
Returns:
A number in [0, count * len(probs)] representing entropy.
"""
assert count > 0
multi_probs = probs
for _ in range(count - 1):
if len(probs) > 2:
raise NotImplementedError(
'Only categorical and binomial are supported')
multi_probs = np.convolve(multi_probs, probs)
return entropy(multi_probs)
python类convolve()的实例源码
def cal_running_std(event_freq, n=16):
"""Calculate running standard deviation.
Parameters
----------
event_freq : numpy.ndarray
Event frequency count in given window
n : int
Running window for computing STD
Returns
-------
o : numpy.ndarray
Running standard deviation of given event frequency array
"""
q = event_freq[:, 1]**2
q = np.convolve(q, np.ones((n, )), mode="valid")
s = np.convolve(event_freq[:, 1], np.ones((n, )), mode="valid")
o = (q-s**2/n)/float(n-1)
return o
def smoothCurve(X, Fac):
NPoints = X.shape[0]
dim = X.shape[1]
idx = range(NPoints)
idxx = np.linspace(0, NPoints, NPoints*Fac)
Y = np.zeros((NPoints*Fac, dim))
NPointsOut = 0
for ii in range(dim):
Y[:, ii] = interp.spline(idx, X[:, ii], idxx)
#Smooth with box filter
y = (0.5/Fac)*np.convolve(Y[:, ii], np.ones(Fac*2), mode='same')
Y[0:len(y), ii] = y
NPointsOut = len(y)
Y = Y[0:NPointsOut-1, :]
Y = Y[2*Fac:-2*Fac, :]
return Y
def _plot_walk(self, ax, parameter, data, truth=None, extents=None,
convolve=None, color=None): # pragma: no cover
if extents is not None:
ax.set_ylim(extents)
assert convolve is None or isinstance(convolve, int), \
"Convolve must be an integer pixel window width"
x = np.arange(data.size)
ax.set_xlim(0, x[-1])
ax.set_ylabel(parameter)
if color is None:
color = "#0345A1"
ax.scatter(x, data, c=color, s=2, marker=".", edgecolors="none", alpha=0.5)
max_ticks = self.parent.config["max_ticks"]
ax.yaxis.set_major_locator(MaxNLocator(max_ticks, prune="lower"))
if convolve is not None:
color2 = self.parent.color_finder.scale_colour(color, 0.5)
filt = np.ones(convolve) / convolve
filtered = np.convolve(data, filt, mode="same")
ax.plot(x[:-1], filtered[:-1], ls=':', color=color2, alpha=1)
if truth is not None:
ax.axhline(truth, **self.parent.config_truth)
def moving_average(data, window_size=5):
"""
Moving average.
Parameters
----------
data : array_like
An array of values.
window_size : int, optional
Moving average window size.
Returns
-------
result : array_like
Moving averaged array.
"""
window = np.ones(int(window_size))/float(window_size)
results = np.convolve(data, window, 'valid')
return results
def boxcar(y, window_size=3):
"""
Smooth the input vector using the mean of the neighboring values,
where neighborhood size is defined by the window.
Parameters
==========
y : array
The vector to be smoothed.
window_size : int
An odd integer describing the window size.
Returns
=======
: array
The smoothed array.
"""
filt = np.ones(window_size) / window_size
return Series(np.convolve(y, filt, mode='same'), index=y.index)
def gaussian(y, window_size=3, sigma=2):
"""
Apply a gaussian filter to smooth the input vector
Parameters
==========
y : array
The input array
window_size : int
An odd integer describing the size of the filter.
sigma : float
The numver of standard deviation
"""
filt = signal.gaussian(window_size, sigma)
return Series(signal.convolve(y, filt, mode='same'), index=y.index)
def convolve(infile, ir_name, level = 0.5):
""" Apply convolution to infile using impulse response given
Args:
infile (str): Filename
ir_name can be 'smartphone_mic' or 'classroom'
level (float) : can be between 0 and 1, default value = 0.5
"""
fs1, x = monoWavRead(filename=infile)
x = np.copy(x)
#Change the path below for the sounds folder
ir_path = './sounds/ir_{0}.wav'.format(ir_name)
fs2, ir = monoWavRead(filename=ir_path)
y = np.convolve(x, ir, 'full')[0:x.shape[0]] * level + x * (1 - level)
#Change the output file name to suit your requirements here
outfile_name = os.path.basename(infile).split(".")[0] + ("{0}_convolved{1}.wav".format(ir_name, level))
outfile = os.path.join(outfile_path, outfile_name)
write(filename = outfile, rate = fs1, data = y)
if (FILE_DELETION):
extractFeaturesAndDelete(outfile)
def convolve_lsf(flux,lsf):
if len(flux) < len(np.atleast_1d(lsf)):
# Add padding to make sure to return the same length in flux.
padding = np.ones(len(lsf)-len(flux)+1)
flux = np.hstack([padding,flux])
conv_flux = 1-np.convolve(1-flux,lsf,mode='same') /np.sum(lsf)
return conv_flux[len(padding):]
else:
# convolve 1-flux to remove edge effects wihtout using padding
return 1-np.convolve(1-flux,lsf,mode='same') /np.sum(lsf)
###############################################################################
# Convergence
###############################################################################
def moving_average(x, n, type='simple'):
"""
compute an n period moving average.
type is 'simple' | 'exponential'
"""
x = np.asarray(x)
if type == 'simple':
weights = np.ones(n)
else:
weights = np.exp(np.linspace(-1., 0., n))
weights /= weights.sum()
a = np.convolve(x, weights, mode='full')[:len(x)]
a[:n] = a[n]
return a
def ma(x, win):
"""Compute the moving average of x with a window equal to win
Args:
x (numpy.array): data
win (int): window
Returns:
numpy.array: the smoothed data
"""
y = np.ones(win, dtype=np.float64)
i = win - 1
_x = np.convolve(x, y, mode='full')[:-i]
_x[1:i] = _x[1:i] / np.arange(2., win, dtype=np.float64)
_x[i:] = _x[i:] / float(win)
return _x
def frequencyResponse(self, freqs=None):
if self.bandType == 'allpass':
return spsig.freqz(1, worN=freqs)
if self.bandType == 'allstop':
return spsig.freqz(0, worN=freqs)
numCoef = self.numCoef
denomCoef = self.denomCoef
if self.zeroPhase:
# http://www.mathworks.com/matlabcentral/newsreader/view_thread/245017
numCoef = np.convolve(numCoef,numCoef[::-1])
denomCoef = np.convolve(denomCoef,denomCoef[::-1])
# freqz does not preserve dtype of arguments, report bug XXX - idfah
return spsig.freqz(numCoef, denomCoef, worN=freqs)
def probs_to_classes(self, probabilities):
"""Takes a likelihood matrix produced by predict_proba and returns
the classification for each entry
Naive argmax returns a very noisy signal - windowing helps focus on
strongly matching areas.
"""
smooth_time = self.params.get('smooth_time', 0.1)
dt = self.active_song.time[1] - self.active_song.time[0]
windowsize = np.round(smooth_time / dt)
window = signal.get_window('hamming', int(windowsize))
window /= np.sum(window)
num_classes = probabilities.shape[0]
smooth_prbs = [np.convolve(
probabilities[i, :], window, mode='same') for i in range(num_classes)]
return np.argmax(np.stack(smooth_prbs, axis=0), axis=0)
def savitzky_golay(y, window_size, order, deriv=0, rate=1):
import numpy as np
from math import factorial
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except ValueError, msg:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order+1)
half_window = (window_size -1) // 2
b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv)
firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )
lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve( m[::-1], y, mode='valid')
def savitzky_golay(y, window_size, order, deriv=0, rate=1):
import numpy as np
from math import factorial
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except ValueError, msg:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order+1)
half_window = (window_size -1) // 2
b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv)
firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )
lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve( m[::-1], y, mode='valid')
def savitzky_golay(y, window_size, order, deriv=0, rate=1):
import numpy as np
from math import factorial
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except ValueError, msg:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order+1)
half_window = (window_size -1) // 2
b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv)
firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )
lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve( m[::-1], y, mode='valid')
def synth_audio(audiofile, impfile, chns, angle, nsfile=None, snrlevel=None, outname=None, outsplit=False):
FreqSamp, audio = wavfile.read(audiofile)
audio = audio.astype(np.float32)/np.amax(np.absolute(audio.astype(np.float32)))
gen_audio = np.zeros((audio.shape[0], chns), dtype=np.float32)
for ch in range(1,chns+1):
impulse = np.fromfile('{}D{:03d}_ch{}.flt'.format(impfile, angle, ch), dtype=np.float32)
gen_audio[:,ch-1] = np.convolve(audio, impulse, mode='same')
gen_audio = add_noise(gen_audio, nsfile=nsfile, snrlevel=snrlevel)
if outname is None:
return FreqSamp, np.transpose(gen_audio)
if outsplit:
for ch in range(chns):
play_data = audiolab.wavwrite(gen_audio[:,ch],'{}_ch{:02d}.wav'.format(outname,ch), fs=FreqSamp, enc='pcm16')
return
else:
play_data = audiolab.wavwrite(gen_audio,'{}.wav'.format(outname), fs=FreqSamp, enc='pcm16')
return
def savitzky_golay(y, window_size, order, deriv=0, rate=1):
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except ValueError, msg:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order+1)
half_window = (window_size -1) // 2
# precompute coefficients
b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv)
# pad the signal at the extremes with
# values taken from the signal itself
firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )
lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve( m[::-1], y, mode='valid')
def running_average(obs, ws):
'''
calculates a running average
obs -- observations
ws -- window size (number of points to average)
'''
ws=int(ws)
try:
tmp_vals = np.convolve(np.ones(ws, dtype=float)/ws, obs, mode='same')
# fix the edges. using mode='same' assumes zeros outside the range
if ws%2==0:
tmp_vals[:ws//2]*=float(ws)/np.arange(ws//2,ws)
if ws//2>1:
tmp_vals[-ws//2+1:]*=float(ws)/np.arange(ws-1,ws//2,-1.0)
else:
tmp_vals[:ws//2]*=float(ws)/np.arange(ws//2+1,ws)
tmp_vals[-ws//2:]*=float(ws)/np.arange(ws,ws//2,-1.0)
except:
import ipdb; ipdb.set_trace()
tmp_vals = 0.5*np.ones_like(obs, dtype=float)
return tmp_vals
def smooth(x, window_len=10, window='hanning'):
x=np.array(x)
if x.ndim != 1:
raise ValueError, "smooth only accepts 1 dimension arrays."
if x.size < window_len:
raise ValueError, "Input vector needs to be bigger than window size."
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
s = np.r_[2 * x[0] - x[window_len - 1::-1], x, 2 * x[-1] - x[-1:-window_len:-1]]
if window == 'flat': # moving average
w = np.ones(window_len, 'd')
else:
w = eval('np.' + window + '(window_len)')
y = np.convolve(w / w.sum(), s, mode='same')
return y[window_len:-window_len + 1]
def measureLoop(self):
""" Measure 10 values, add them to buffer and remove the 10 oldest values.
"""
if self.stopRequest:
self.stopRequest = False
self.unlock()
return
data = np.zeros((100, self._data_logic.getChannels()))
data[:, 0] = np.array([self._data_logic.getData() for i in range(100)])
self.buf = np.roll(self.buf, -100, axis=0)
self.buf[-101:-1] = data
w = np.hanning(self.window_len)
s = np.r_[self.buf[self.window_len-1:0:-1], self.buf, self.buf[-1:-self.window_len:-1]]
for channel in range(self._data_logic.getChannels()):
convolved = np.convolve(w/w.sum(), s[:, channel], mode='valid')
self.smooth[:, channel] = convolved
self.sigRepeat.emit()
def moving_average(x, n, type='simple'):
"""
compute an n period moving average.
type is 'simple' | 'exponential'
"""
x = np.asarray(x)
if type == 'simple':
weights = np.ones(n)
else:
weights = np.exp(np.linspace(-1., 0., n))
weights /= weights.sum()
a = np.convolve(x, weights, mode='full')[:len(x)]
a[:n] = a[n]
return a
def sobel(x, window_len=7):
"""Sobel differential filter for calculating KDP.
This solution has been taken from StackOverflow :cite:`Sobel-linfit`
Returns
-------
output : differential signal (unscaled for gate spacing)
"""
s = np.r_[x[window_len - 1:0:-1], x, x[-1:-window_len:-1]]
w = 2.0 * np.arange(window_len) / (window_len - 1.0) - 1.0
w = w / (abs(w).sum())
y = np.convolve(w, s, mode='valid')
return (-1.0 * y[int(window_len / 2):len(x) + int(window_len / 2)] /
(window_len / 3.0))
dqn_features.py 文件源码
项目:tensorflow-stock-prediction
作者: weitingforyou
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def get_moving_average(x, n, type_str):
#compute an n period moving average.
#type is 'simple' | 'exponential'
my_list=[]
x = np.asarray(x)
if type_str == 'simple':
weights = np.ones(n)
elif type_str == 'exponential':
weights = np.exp(np.linspace(-1., 0., n))
elif type_str == 'weight':
weights = np.flipud(np.arange(1,n+1, dtype=float))
weights /= weights.sum()
a = np.convolve(x, weights, mode='full')[:len(x)]
a[:n] = a[n]
for i in xrange (0, len(a), 1):
my_list.append(np.array_str(a[i]))
return my_list
def testCausalConv(self):
"""Tests that the op is equivalent to a numpy implementation."""
x1 = np.arange(1, 21, dtype=np.float32)
x = np.append(x1, x1)
x = np.reshape(x, [2, 20, 1])
f = np.reshape(np.array([1, 1], dtype=np.float32), [2, 1, 1])
out = causal_conv(x, f, 4)
with self.test_session() as sess:
result = sess.run(out)
# Causal convolution using numpy
ref = np.convolve(x1, [1, 0, 0, 0, 1], mode='valid')
ref = np.append(ref, ref)
ref = np.reshape(ref, [2, 16, 1])
self.assertAllEqual(result, ref)
def _convolve_or_correlate(f, a, v, mode, propagate_mask):
"""
Helper function for ma.correlate and ma.convolve
"""
if propagate_mask:
# results which are contributed to by either item in any pair being invalid
mask = (
f(getmaskarray(a), np.ones(np.shape(v), dtype=np.bool), mode=mode)
| f(np.ones(np.shape(a), dtype=np.bool), getmaskarray(v), mode=mode)
)
data = f(getdata(a), getdata(v), mode=mode)
else:
# results which are not contributed to by any pair of valid elements
mask = ~f(~getmaskarray(a), ~getmaskarray(v))
data = f(filled(a, 0), filled(v, 0), mode=mode)
return masked_array(data, mask=mask)
def _zseries_mul(z1, z2):
"""Multiply two z-series.
Multiply two z-series to produce a z-series.
Parameters
----------
z1, z2 : 1-D ndarray
The arrays must be 1-D but this is not checked.
Returns
-------
product : 1-D ndarray
The product z-series.
Notes
-----
This is simply convolution. If symmetric/anti-symmetric z-series are
denoted by S/A then the following rules apply:
S*S, A*A -> S
S*A, A*S -> A
"""
return np.convolve(z1, z2)
def plot_bitcoin():
nrows = 1625
df = pandas.read_csv('coindesk-bpi-USD-close.csv',
nrows=nrows, parse_dates=[0])
ys = df.Close.values
window = numpy.ones(30)
window /= sum(window)
smoothed = numpy.convolve(ys, window, mode='valid')
N = len(window)
smoothed = thinkdsp.shift_right(smoothed, N//2)
thinkplot.plot(ys, color='0.7', label='daily')
thinkplot.plot(smoothed, label='30 day average')
thinkplot.config(xlabel='time (days)',
ylabel='price',
xlim=[0, nrows],
# ylim=[-60, 60],
loc='lower right')
thinkplot.save(root='convolution1')
def MA(x, n, type='simple'):
"""
compute an n period moving average.
type is 'simple' | 'exponential'
"""
x = np.asarray(x)
if type=='simple':
weights = np.ones(n)
else:
weights = np.exp(np.linspace(-1., 0., n))
weights /= weights.sum()
a = np.convolve(x, weights, mode='full')[:len(x)]
a[:n] = a[n]
return a
def filter(self, X, Y):
if self.interpolate:
X, Y = self.simplefill(X, Y)
else:
X, Y = self.sortxy(X, Y)
order_range = list(range(self.order+1))
half_window = (self.window_size - 1) // 2
# precompute coefficients
b = np.mat([[k**i for i in order_range]
for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[self.deriv]
# pad the signal at the extremes with
# values taken from the signal itself
firstvals = Y[0] - np.abs(Y[1:half_window+1][::-1] - Y[0])
lastvals = Y[-1] + np.abs(Y[-half_window-1:-1][::-1] - Y[-1])
Y1 = np.concatenate((firstvals, Y, lastvals))
Y2 = np.convolve(m, Y1, mode='valid')
return X, Y2