def lpfls(N,wp,ws,W):
M = (N-1)/2
nq = np.arange(0,2*M+1)
nb = np.arange(0,M+1)
q = (wp/np.pi)*np.sinc((wp/np.pi)*nq) - W*(ws/np.pi)*np.sinc((ws/np.pi)*nq)
b = (wp/np.pi)*np.sinc((wp/np.pi)*nb)
b[0] = wp/np.pi
q[0] = wp/np.pi + W*(1-ws/np.pi) # since sin(pi*n)/pi*n = 1, not 0
b = b.transpose()
Q1 = ln.toeplitz(q[0:M+1])
Q2 = ln.hankel(q[0:M+1],q[M:])
Q = Q1+Q2
a = ln.solve(Q,b)
h = list(nq)
for i in nb:
h[i] = 0.5*a[M-i]
h[N-1-i] = h[i]
h[M] = 2*h[M]
hmax = max(np.absolute(h))
for i in nq:
h[i] = (8191/hmax)*h[i]
return h
python类toeplitz()的实例源码
LSFIR.py 文件源码
项目:Least-Squared-Error-Based-FIR-Filters
作者: fourier-being
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
LSFIR.py 文件源码
项目:Least-Squared-Error-Based-FIR-Filters
作者: fourier-being
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def bpfls(N,ws1,wp1,wp2,ws2,W):
M = (N-1)/2
nq = np.arange(0,2*M+1)
nb = np.arange(0,M+1)
q = W*np.sinc(nq) - (W*ws2/np.pi) * np.sinc(nq* (ws2/np.pi)) + (wp2/np.pi) * np.sinc(nq*(wp2/np.pi)) - (wp1/np.pi) * np.sinc(nq*(wp1/np.pi)) + (W*ws1/np.pi) * np.sinc(nq*(ws1/np.pi))
b = (wp2/np.pi)*np.sinc((wp2/np.pi)*nb) - (wp1/np.pi)*np.sinc((wp1/np.pi)*nb)
b[0] = wp2/np.pi - wp1/np.pi
q[0] = W - W*ws2/np.pi + wp2/np.pi - wp1/np.pi + W*ws1/np.pi # since sin(pi*n)/pi*n = 1, not 0
b = b.transpose()
Q1 = ln.toeplitz(q[0:M+1])
Q2 = ln.hankel(q[0:M+1],q[M:])
Q = Q1+Q2
a = ln.solve(Q,b)
h = list(nq)
for i in nb:
h[i] = 0.5*a[M-i]
h[N-1-i] = h[i]
h[M] = 2*h[M]
hmax = max(np.absolute(h))
for i in nq:
h[i] = (8191/hmax)*h[i]
return h
LSFIR.py 文件源码
项目:Least-Squared-Error-Based-FIR-Filters
作者: fourier-being
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def hpfls(N,ws,wp,W):
M = (N-1)/2
nq = np.arange(0,2*M+1)
nb = np.arange(0,M+1)
b = 1 - (wp/np.pi)* np.sinc(nb * wp/np.pi)
b[0] = 1- wp/np.pi
q = 1 - (wp/np.pi)* np.sinc(nq * wp/np.pi) + W * (ws/np.pi) * np.sinc(nq * ws/np.pi) # since sin(pi*n)/pi*n = 1, not 0
q[0] = b[0] + W* ws/np.pi
b = b.transpose()
Q1 = ln.toeplitz(q[0:M+1])
Q2 = ln.hankel(q[0:M+1],q[M:])
Q = Q1+Q2
a = ln.solve(Q,b)
h = list(nq)
for i in nb:
h[i] = 0.5*a[M-i]
h[N-1-i] = h[i]
h[M] = 2*h[M]
hmax = max(np.absolute(h))
for i in nq:
h[i] = (8191/hmax)*h[i]
return h
def corrmtx(x,m):
"""
from https://github.com/cokelaer/spectrum/
like matlab corrmtx(x,'mod'), with a different normalization factor.
"""
x = np.asarray(x, dtype=float)
assert x.ndim == 1, '1-D only'
N = x.size
Tp = toeplitz(x[m:N], x[m::-1])
C = np.zeros((2*(N-m), m+1), dtype=x.dtype)
for i in range(0, N-m):
C[i] = Tp[i]
Tp = np.fliplr(Tp.conj())
for i in range(N-m, 2*(N-m)):
C[i] = Tp[i-N+m]
return C
def aryule(c, k):
"""Solve Yule-Walker equation.
Args:
c (numpy array): Coefficients (i.e. autocorrelation)
k (int): Assuming the AR(k) model
Returns:
numpy array: k model parameters
Some formulations solve: C a = -c,
but we actually solve C a = c.
"""
a = np.zeros(k)
# ignore a singular matrix
C = toeplitz(c[:k])
if not np.all(C == 0.0) and np.isfinite(ln.cond(C)):
a = np.dot(ln.inv(C), c[1:])
return a
def toepz(self):
cormat = np.zeros((self.nkdim, self.nkdim))
epsilon = (1 - np.max(self.rho)) / (1 + np.max(self.rho)) - .01
for i in np.arange(self.k):
t = np.insert(np.power(self.rho[i], np.arange(1, self.nk[i])), 0, 1)
cor = toeplitz(t)
if i == 0:
cormat[0:self.nk[0], 0:self.nk[0]] = cor
if i != 0:
cormat[np.sum(self.nk[0:i]):np.sum(self.nk[0:i + 1]),
np.sum(self.nk[0:i]):np.sum(self.nk[0:i + 1])] = cor
np.fill_diagonal(cormat, 1 - epsilon)
cormat = self._generate_noise(cormat, self.nkdim, self.M, epsilon)
return cormat
def hub(self):
cormat = np.zeros((self.nkdim, self.nkdim))
for i in np.arange(self.k):
cor = toeplitz(self._fill_hub_matrix(self.rho[i,0],self.rho[i,1], self.power, self.nk[i]))
if i == 0:
cormat[0:self.nk[0], 0:self.nk[0]] = cor
if i != 0:
cormat[np.sum(self.nk[0:i]):np.sum(self.nk[0:i + 1]),
np.sum(self.nk[0:i]):np.sum(self.nk[0:i + 1])] = cor
tau = (np.max(self.rho[i]) - np.min(self.rho[i])) / (self.nk[i] - 2)
epsilon = 0.08 #(1 - np.min(rho) - 0.75 * np.min(tau)) - 0.01
np.fill_diagonal(cormat, 1 - epsilon)
cormat = self._generate_noise(cormat, self.nkdim, self.M, epsilon)
return cormat
def aryule(c, k):
"""Solve Yule-Walker equation.
Args:
c (numpy array): Coefficients (i.e. autocorrelation)
k (int): Assuming the AR(k) model
Returns:
numpy array: k model parameters
Some formulations solve: C a = -c,
but we actually solve C a = c.
"""
a = np.zeros(k)
# ignore a singular matrix
C = toeplitz(c[:k])
if not np.all(C == 0.0) and np.isfinite(ln.cond(C)):
a = np.dot(ln.inv(C), c[1:])
return a
LSFIR.py 文件源码
项目:Least-Squared-Error-Based-FIR-Filters
作者: fourier-being
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def lpfls2notch(N,wp,ws,wn1,wn2,W):
M = (N-1)/2
nq = np.arange(0,2*M+1)
nb = np.arange(0,M+1)
q = (wp/np.pi)*np.sinc((wp/np.pi)*nq) - W*(ws/np.pi)*np.sinc((ws/np.pi)*nq)
b = (wp/np.pi)*np.sinc((wp/np.pi)*nb)
q[0] = wp/np.pi + W*(1-ws/np.pi) # since sin(pi*n)/pi*n = 1, not 0
b = np.asmatrix(b)
b = b.transpose()
Q1 = ln.toeplitz(q[0:M+1])
Q2 = ln.hankel(q[0:M+1],q[M:])
Q = Q1+Q2
G1 = np.cos(wn1*nb)
G2 = np.cos(wn2*nb)
G = np.matrix([G1,G2])
d = np.array([0,0])
d = np.asmatrix(d)
d = d.transpose()
c = np.asmatrix(ln.solve(Q,b))
mu = ln.solve(G*ln.inv(Q)*G.transpose(),G*c - d)
a = c - ln.solve(Q,G.transpose()*mu)
h = np.zeros(N)
for i in nb:
h[i] = 0.5*a[M-i]
h[N-1-i] = h[i]
h[M] = 2*h[M]
hmax = max(np.absolute(h))
for i in nq:
h[i] = (8191/hmax)*h[i]
return h
LSFIR.py 文件源码
项目:Least-Squared-Error-Based-FIR-Filters
作者: fourier-being
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def lpfls1notch(N,wp,ws,wn1,W):
M = (N-1)/2
nq = np.arange(0,2*M+1)
nb = np.arange(0,M+1)
q = (wp/np.pi)*np.sinc((wp/np.pi)*nq) - W*(ws/np.pi)*np.sinc((ws/np.pi)*nq)
b = (wp/np.pi)*np.sinc((wp/np.pi)*nb)
q[0] = wp/np.pi + W*(1-ws/np.pi) # since sin(pi*n)/pi*n = 1, not 0
b = np.asmatrix(b)
b = b.transpose()
Q1 = ln.toeplitz(q[0:M+1])
Q2 = ln.hankel(q[0:M+1],q[M:])
Q = Q1+Q2
G1 = np.cos(wn1*nb)
G = np.matrix([G1])
d = np.array([0])
d = np.asmatrix(d)
c = np.asmatrix(ln.solve(Q,b))
mu = ln.solve(G*ln.inv(Q)*G.transpose(),G*c - d)
a = c - ln.solve(Q,G.transpose()*mu)
h = np.zeros(N)
for i in nb:
h[i] = 0.5*a[M-i]
h[N-1-i] = h[i]
h[M] = 2*h[M]
hmax = max(np.absolute(h))
for i in nq:
h[i] = (8191/hmax)*h[i]
return h
def solve_random_qp(n, solver):
M, b = random.random((n, n)), random.random(n)
P, q = dot(M.T, M), dot(b, M).reshape((n,))
G = toeplitz([1., 0., 0.] + [0.] * (n - 3), [1., 2., 3.] + [0.] * (n - 3))
h = ones(n)
return solve_qp(P, q, G, h, solver=solver)
def damped_lstsq(a,b,damping=1.0,plot=False,return_G=False):
'''
Gm = d
G : discrete convolution matrix
m : signal we are trying to recover (receiver function)
d : the convolved data (signal a)
m = (G^T G)^(-1)G^T * d
'''
#build G
padding = np.zeros(a.shape[0] - 1, a.dtype)
first_col = np.r_[a, padding]
first_row = np.r_[a[0], padding]
G = toeplitz(first_col, first_row)
#reshape b
shape = G.shape
shape = shape[0]
len_b = len(b)
zeros = np.zeros((shape-len_b))
b = np.append(b,zeros)
#solve system (use either lsmr or scipy solve)
#sol = lsmr(G,b,damp=damping)
sol = np.linalg.solve(G+damping*np.eye(G.shape[0]),b,overwrite_a=False,overwrite_b=False)
m_est = sol[0]
rf = m_est
if plot==True:
fig,axes = plt.subplots(3,sharex=True)
axes[0].plot(a)
axes[1].plot(b)
axes[2].plot(rf)
plt.show()
if return_G==True:
return G
else:
return rf
def damped_lstsq(a,b,damping=1.0,plot=False):
'''
Gm = d
G : discrete convolution matrix
m : signal we are trying to recover (receiver function)
d : the convolved data (signal a)
m = (G^T G)^(-1)G^T * d
'''
#build G
padding = np.zeros(a.shape[0] - 1, a.dtype)
first_col = np.r_[a, padding]
first_row = np.r_[a[0], padding]
G = toeplitz(first_col, first_row)
#reshape b
shape = G.shape
shape = shape[0]
len_b = len(b)
zeros = np.zeros((shape-len_b))
b = np.append(b,zeros)
#solve with scipy.sparse.linalg.lsmr
sol = lsmr(G,b,damp=damping)
m_est = sol[0]
rf = m_est
if plot==True:
fig,axes = plt.subplots(3,sharex=True)
axes[0].plot(a)
axes[1].plot(b)
axes[2].plot(rf)
plt.show()
return rf
def damped_lstsq(a,b,damping=1.0,plot=False):
'''
Gm = d
G : discrete convolution matrix
m : signal we are trying to recover (receiver function)
d : the convolved data (signal a)
m = (G^T G)^(-1)G^T * d
'''
#build G
padding = np.zeros(a.shape[0] - 1, a.dtype)
first_col = np.r_[a, padding]
first_row = np.r_[a[0], padding]
G = toeplitz(first_col, first_row)
#reshape b
shape = G.shape
shape = shape[0]
len_b = len(b)
zeros = np.zeros((shape-len_b))
b = np.append(b,zeros)
#solve with scipy.sparse.linalg.lsmr
sol = lsmr(G,b,damp=damping)
m_est = sol[0]
rf = m_est
if plot==True:
fig,axes = plt.subplots(3,sharex=True)
axes[0].plot(a)
axes[1].plot(b)
axes[2].plot(rf)
plt.show()
return rf
def generate_data(n_samples, n_features, size_groups, rho=0.5,
random_state=24):
""" Data generation process with Toplitz like correlated features:
this correspond to the synthetic dataset used in our paper
"GAP Safe Screening Rules for Sparse-Group Lasso".
"""
rng = check_random_state(random_state)
n_groups = len(size_groups)
# g_start = np.zeros(n_groups, order='F', dtype=np.intc)
# for i in range(1, n_groups):
# g_start[i] = size_groups[i - 1] + g_start[i - 1]
g_start = np.cumsum(size_groups, dtype=np.intc) - size_groups[0]
# 10% of groups are actives
gamma1 = int(np.ceil(n_groups * 0.1))
selected_groups = rng.random_integers(0, n_groups - 1, gamma1)
true_beta = np.zeros(n_features)
for i in selected_groups:
begin = g_start[i]
end = g_start[i] + size_groups[i]
# 10% of features are actives
gamma2 = int(np.ceil(size_groups[i] * 0.1))
selected_features = rng.random_integers(begin, end - 1, gamma2)
ns = len(selected_features)
s = 2 * rng.rand(ns) - 1
u = rng.rand(ns)
true_beta[selected_features] = np.sign(s) * (10 * u + (1 - u) * 0.5)
vect = rho ** np.arange(n_features)
covar = toeplitz(vect, vect)
X = rng.multivariate_normal(np.zeros(n_features), covar, n_samples)
y = np.dot(X, true_beta) + 0.01 * rng.normal(0, 1, n_samples)
return X, y
def Tmtx_ri(b_ri, K, D, L):
"""
build convolution matrix associated with b_ri
:param b_ri: a real-valued vector
:param K: number of Diracs
:param D1: expansion matrix for the real-part
:param D2: expansion matrix for the imaginary-part
:return:
"""
b_ri = np.dot(D, b_ri)
b_r = b_ri[:L]
b_i = b_ri[L:]
Tb_r = linalg.toeplitz(b_r[K:], b_r[K::-1])
Tb_i = linalg.toeplitz(b_i[K:], b_i[K::-1])
return np.vstack((np.hstack((Tb_r, -Tb_i)), np.hstack((Tb_i, Tb_r))))
def Rmtx_ri(coef_ri, K, D, L):
coef_ri = np.squeeze(coef_ri)
coef_r = coef_ri[:K + 1]
coef_i = coef_ri[K + 1:]
R_r = linalg.toeplitz(np.concatenate((np.array([coef_r[-1]]),
np.zeros(L - K - 1))),
np.concatenate((coef_r[::-1],
np.zeros(L - K - 1)))
)
R_i = linalg.toeplitz(np.concatenate((np.array([coef_i[-1]]),
np.zeros(L - K - 1))),
np.concatenate((coef_i[::-1],
np.zeros(L - K - 1)))
)
return np.dot(np.vstack((np.hstack((R_r, -R_i)), np.hstack((R_i, R_r)))), D)
def estimate(self, nbcorr=np.nan, numpsd=-1):
fft_length, _ = self.check_params()
if np.isnan((nbcorr)):
nbcorr = self.ordar
# -------- estimate correlation from psd
full_psd = self.psd[numpsd]
full_psd = np.c_[full_psd, np.conjugate(full_psd[:, :0:-1])]
correl = fftpack.ifft(full_psd[0], fft_length, 0).real
# -------- estimate AR part
col1 = correl[self.ordma:self.ordma + nbcorr]
row1 = correl[np.abs(
np.arange(self.ordma, self.ordma - self.ordar, -1))]
R = linalg.toeplitz(col1, row1)
r = -correl[self.ordma + 1:self.ordma + nbcorr + 1]
AR = linalg.solve(R, r)
self.AR_ = AR
# -------- estimate correlation of MA part
# -------- estimate MA part
if self.ordma == 0:
sigma2 = correl[0] + np.dot(AR, correl[1:self.ordar + 1])
self.MA = np.ones(1) * np.sqrt(sigma2)
else:
raise NotImplementedError(
'arma: estimation of the MA part not yet implemented')
def least_square_evoked(epochs, return_toeplitz=False):
"""Least square estimation of evoked response from a Epochs instance.
Parameters
----------
epochs : Epochs instance
An instance of Epochs.
return_toeplitz : bool (default False)
If true, compute the toeplitz matrix.
Returns
-------
evokeds : dict of evoked instance
An dict of evoked instance for each event type in epochs.event_id.
toeplitz : dict of ndarray
If return_toeplitz is true, return the toeplitz matrix for each event
type in epochs.event_id.
"""
if not isinstance(epochs, _BaseEpochs):
raise ValueError('epochs must be an instance of `mne.Epochs`')
events = epochs.events.copy()
events[:, 0] -= (np.min(events[:, 0]) +
int(epochs.tmin * epochs.info['sfreq']))
data = _construct_signal_from_epochs(epochs)
evoked_data, toeplitz = _least_square_evoked(data, events, epochs.event_id,
tmin=epochs.tmin,
tmax=epochs.tmax,
sfreq=epochs.info['sfreq'])
evokeds = dict()
info = cp.deepcopy(epochs.info)
for name, data in evoked_data.items():
n_events = len(events[events[:, 2] == epochs.event_id[name]])
evoked = EvokedArray(data, info, tmin=epochs.tmin,
comment=name, nave=n_events)
evokeds[name] = evoked
if return_toeplitz:
return evokeds, toeplitz
return evokeds
def roots(self):
"""
Utilises Boyd's O(n^2) recursive subdivision algorithm. The chebfun
is recursively subsampled until it is successfully represented to
machine precision by a sequence of piecewise interpolants of degree
100 or less. A colleague matrix eigenvalue solve is then applied to
each of these pieces and the results are concatenated.
See:
J. P. Boyd, Computing zeros on a real interval through Chebyshev
expansion and polynomial rootfinding, SIAM J. Numer. Anal., 40
(2002), pp. 1666–1682.
"""
if self.size() == 1:
return np.array([])
elif self.size() <= 100:
ak = self.coefficients()
v = np.zeros_like(ak[:-1])
v[1] = 0.5
C1 = linalg.toeplitz(v)
C2 = np.zeros_like(C1)
C1[0,1] = 1.
C2[-1,:] = ak[:-1]
C = C1 - .5/ak[-1] * C2
eigenvalues = linalg.eigvals(C)
roots = [eig.real for eig in eigenvalues
if np.allclose(eig.imag,0,atol=1e-10)
and np.abs(eig.real) <=1]
scaled_roots = self._ui_to_ab(np.array(roots))
return scaled_roots
else:
# divide at a close-to-zero split-point
split_point = self._ui_to_ab(0.0123456789)
return np.concatenate(
(self.restrict([self._domain[0],split_point]).roots(),
self.restrict([split_point,self._domain[1]]).roots())
)
# ----------------------------------------------------------------
# Interpolation and evaluation (go from values to coefficients)
# ----------------------------------------------------------------
def _create_Toeplitz(data, npts_stf):
# Desired dimensions: len(data) + npts_stf - 1 x npts_stf
nrows = npts_stf + len(data) - 1
data_col = data
first_col = np.r_[data_col,
np.zeros(nrows - len(data_col))]
ncols = npts_stf
first_row = np.r_[data[0], np.zeros(ncols - 1)]
return toeplitz(first_col, first_row)
def pade(time,dipole):
damp_const = 100.0
dipole = np.asarray(dipole) - dipole[0]
stepsize = time[1] - time[0]
#print dipole
damp = np.exp(-(stepsize*np.arange(len(dipole)))/float(damp_const))
dipole *= damp
M = len(dipole)
N = int(np.floor(M / 2))
#print "N = ", N
num_pts = 20000
if N > num_pts:
N = num_pts
#print "Trimmed points to: ", N
# G and d are (N-1) x (N-1)
# d[k] = -dipole[N+k] for k in range(1,N)
d = -dipole[N+1:2*N]
try:
from scipy.linalg import toeplitz, solve_toeplitz
except ImportError:
print("You'll need SciPy version >= 0.17.0")
try:
# Instead, form G = (c,r) as toeplitz
#c = dipole[N:2*N-1]
#r = np.hstack((dipole[1],dipole[N-1:1:-1]))
b = solve_toeplitz((dipole[N:2*N-1],\
np.hstack((dipole[1],dipole[N-1:1:-1]))),d,check_finite=False)
except np.linalg.linalg.LinAlgError:
# OLD CODE: sometimes more stable
# G[k,m] = dipole[N - m + k] for m,k in range(1,N)
G = dipole[N + np.arange(1,N)[:,None] - np.arange(1,N)]
b = np.linalg.solve(G,d)
# Now make b Nx1 where b0 = 1
b = np.hstack((1,b))
# b[m]*dipole[k-m] for k in range(0,N), for m in range(k)
a = np.dot(np.tril(toeplitz(dipole[0:N])),b)
p = np.poly1d(a)
q = np.poly1d(b)
# If you want energies greater than 2*27.2114 eV, you'll need to change
# the default frequency range to something greater.
#frequency = np.arange(0.00,2.0,0.00005)
frequency = np.arange(0.3,0.75,0.0002)
W = np.exp(-1j*frequency*stepsize)
fw = p(W)/q(W)
return fw, frequency
def _project(reference_sources, estimated_source, flen):
"""Least-squares projection of estimated source on the subspace spanned by
delayed versions of reference sources, with delays between 0 and flen-1
"""
nsrc = reference_sources.shape[0]
nsampl = reference_sources.shape[1]
# computing coefficients of least squares problem via FFT ##
# zero padding and FFT of input data
reference_sources = np.hstack((reference_sources,
np.zeros((nsrc, flen - 1))))
estimated_source = np.hstack((estimated_source, np.zeros(flen - 1)))
n_fft = int(2**np.ceil(np.log2(nsampl + flen - 1.)))
sf = scipy.fftpack.fft(reference_sources, n=n_fft, axis=1)
sef = scipy.fftpack.fft(estimated_source, n=n_fft)
# inner products between delayed versions of reference_sources
G = np.zeros((nsrc * flen, nsrc * flen))
for i in range(nsrc):
for j in range(nsrc):
ssf = sf[i] * np.conj(sf[j])
ssf = np.real(scipy.fftpack.ifft(ssf))
ss = toeplitz(np.hstack((ssf[0], ssf[-1:-flen:-1])),
r=ssf[:flen])
G[i * flen: (i+1) * flen, j * flen: (j+1) * flen] = ss
G[j * flen: (j+1) * flen, i * flen: (i+1) * flen] = ss.T
# inner products between estimated_source and delayed versions of
# reference_sources
D = np.zeros(nsrc * flen)
for i in range(nsrc):
ssef = sf[i] * np.conj(sef)
ssef = np.real(scipy.fftpack.ifft(ssef))
D[i * flen: (i+1) * flen] = np.hstack((ssef[0], ssef[-1:-flen:-1]))
# Computing projection
# Distortion filters
try:
C = np.linalg.solve(G, D).reshape(flen, nsrc, order='F')
except np.linalg.linalg.LinAlgError:
C = np.linalg.lstsq(G, D)[0].reshape(flen, nsrc, order='F')
# Filtering
sproj = np.zeros(nsampl + flen - 1)
for i in range(nsrc):
sproj += fftconvolve(C[:, i], reference_sources[i])[:nsampl + flen - 1]
return sproj
def estimate_time_constant(Y, sn, p = None, lags = 5, include_noise = False, pixels = None):
"""
Estimating global time constants for the dataset Y through the autocovariance function (optional).
The function is no longer used in the standard setting of the algorithm since every trace has its own
time constant.
Inputs:
Y: np.ndarray (2D)
input movie data with time in the last axis
p: positive integer
order of AR process, default: 2
lags: positive integer
number of lags in the past to consider for determining time constants. Default 5
include_noise: Boolean
Flag to include pre-estimated noise value when determining time constants. Default: False
pixels: np.ndarray
Restrict estimation to these pixels (e.g., remove saturated pixels). Default: All pixels
Output:
g: np.ndarray (p x 1)
Discrete time constants
"""
if p is None:
raise Exception("You need to define p")
if pixels is None:
pixels = np.arange(np.size(Y)/np.shape(Y)[-1])
from scipy.linalg import toeplitz
npx = len(pixels)
g = 0
lags += p
XC = np.zeros((npx,2*lags+1))
for j in range(npx):
XC[j,:] = np.squeeze(axcov(np.squeeze(Y[pixels[j],:]),lags))
gv = np.zeros(npx*lags)
if not include_noise:
XC = XC[:,np.arange(lags-1,-1,-1)]
lags -= p
A = np.zeros((npx*lags,p))
for i in range(npx):
if not include_noise:
A[i*lags+np.arange(lags),:] = toeplitz(np.squeeze(XC[i,np.arange(p-1,p+lags-1)]),np.squeeze(XC[i,np.arange(p-1,-1,-1)]))
else:
A[i*lags+np.arange(lags),:] = toeplitz(np.squeeze(XC[i,lags+np.arange(lags)]),np.squeeze(XC[i,lags+np.arange(p)])) - (sn[i]**2)*np.eye(lags,p)
gv[i*lags+np.arange(lags)] = np.squeeze(XC[i,lags+1:])
if not include_noise:
gv = XC[:,p:].T
gv = np.squeeze(np.reshape(gv,(np.size(gv),1),order='F'))
g = np.dot(np.linalg.pinv(A),gv)
return g
def solve_unit_norm_dual(lhs, rhs, lambd0, factr=1e7, debug=False,
lhs_is_toeplitz=False):
if np.all(rhs == 0):
return np.zeros(lhs.shape[0]), 0.
n_atoms = lambd0.shape[0]
n_times_atom = lhs.shape[0] // n_atoms
# precompute SVD
# U, s, V = linalg.svd(lhs)
if lhs_is_toeplitz:
# first column of the toeplitz matrix lhs
lhs_c = lhs[0, :]
# lhs will not stay toeplitz if we add different lambd on the diagonal
assert n_atoms == 1
def x_star(lambd):
lambd += 1e-14 # avoid numerical issues
# lhs_inv = np.dot(V.T / (s + np.repeat(lambd, n_times_atom)), U.T)
# return np.dot(lhs_inv, rhs)
lhs_c_copy = lhs_c.copy()
lhs_c_copy[0] += lambd
return linalg.solve_toeplitz(lhs_c_copy, rhs)
else:
def x_star(lambd):
lambd += 1e-14 # avoid numerical issues
# lhs_inv = np.dot(V.T / (s + np.repeat(lambd, n_times_atom)), U.T)
# return np.dot(lhs_inv, rhs)
return linalg.solve(lhs + np.diag(np.repeat(lambd, n_times_atom)),
rhs)
def dual(lambd):
x_hats = x_star(lambd)
norms = linalg.norm(x_hats.reshape(-1, n_times_atom), axis=1)
return (x_hats.T.dot(lhs).dot(x_hats) - 2 * rhs.T.dot(x_hats) + np.dot(
lambd, norms ** 2 - 1.))
def grad_dual(lambd):
x_hats = x_star(lambd).reshape(-1, n_times_atom)
return linalg.norm(x_hats, axis=1) ** 2 - 1.
def func(lambd):
return -dual(lambd)
def grad(lambd):
return -grad_dual(lambd)
bounds = [(0., None) for idx in range(0, n_atoms)]
if debug:
assert optimize.check_grad(func, grad, lambd0) < 1e-5
lambd_hats, _, _ = optimize.fmin_l_bfgs_b(func, x0=lambd0, fprime=grad,
bounds=bounds, factr=factr)
x_hat = x_star(lambd_hats)
return x_hat, lambd_hats
def fit(self, X, Y):
X = np.asarray(X)
Y = np.asarray(Y)
assert X.shape[1] == self.num_feat
assert Y.shape[1] == self.num_pred
assert X.shape[0] == Y.shape[0]
# Store output minimum and maximum values
self._output_range = np.max(Y, axis = 0) - np.min(Y, axis = 0)
self._output_max = np.max(Y, axis = 0)
self._output_min = np.min(Y, axis = 0)
# Center and standardize inputs
X = self._center_input(X)
X = self._standardize_input(X)
# Center and standardize outputs
Y = self._center_output(Y)
Y = self._standardize_output(Y)
numio = self.total_io
R = self._covf(np.hstack((X,Y)),self.num_lags)
PHI = np.empty((2*self.num_lags-1,numio**2), dtype = float, order='C')
for ii in xrange(numio):
for jj in xrange(numio):
PHI[:,ii+jj*numio] = np.hstack((R[jj+ii*numio,np.arange(self.num_lags-1,0,-1)], R[ii+jj*numio,:]))
Nxxr = np.arange(self.num_lags-1, 2*(self.num_lags-1)+1,1)
Nxxc = np.arange(self.num_lags-1,-1,-1)
Nxy = np.arange(self.num_lags-1, 2*(self.num_lags-1)+1)
# Solve matrix equations to identify filters
PX = np.empty((self.num_feat*self.num_lags,self.num_feat*self.num_lags), dtype=float, order='C')
for ii in xrange(self.num_feat):
for jj in xrange(self.num_feat):
c_start = ii*self.num_lags
c_end = (ii+1)*self.num_lags
r_start = jj*self.num_lags
r_end = (jj+1)*self.num_lags
PX[r_start:r_end,c_start:c_end] = toeplitz(PHI[Nxxc,ii+(jj)*numio],PHI[Nxxr,ii+(jj)*numio])
PXY = np.empty((self.num_feat*self.num_lags, self.num_pred), dtype=float, order='C')
for ii in xrange(self.num_feat):
for jj in xrange(self.num_feat,self.num_feat+self.num_pred,1):
r_start = ii*self.num_lags
r_end = (ii+1)*self.num_lags
c_ind = jj-self.num_feat
PXY[r_start:r_end,c_ind] = PHI[Nxy,ii+(jj)*numio]
self.H = np.linalg.solve((PX + self.reg_lambda*np.ones_like(PX)), PXY)
def _least_square_evoked(data, events, event_id, tmin, tmax, sfreq):
"""Least square estimation of evoked response from data.
Parameters
----------
data : ndarray, shape (n_channels, n_times)
The data to estimates evoked
events : ndarray, shape (n_events, 3)
The events typically returned by the read_events function.
If some events don't match the events of interest as specified
by event_id, they will be ignored.
event_id : dict
The id of the events to consider
tmin : float
Start time before event.
tmax : float
End time after event.
sfreq : float
Sampling frequency.
Returns
-------
evokeds_data : dict of ndarray
A dict of evoked data for each event type in event_id.
toeplitz : dict of ndarray
A dict of toeplitz matrix for each event type in event_id.
"""
nmin = int(tmin * sfreq)
nmax = int(tmax * sfreq)
window = nmax - nmin
n_samples = data.shape[1]
toeplitz_mat = dict()
full_toep = list()
for eid in event_id:
# select events by type
ix_ev = events[:, -1] == event_id[eid]
# build toeplitz matrix
trig = np.zeros((n_samples, 1))
ix_trig = (events[ix_ev, 0]) + nmin
trig[ix_trig] = 1
toep_mat = linalg.toeplitz(trig[0:window], trig)
toeplitz_mat[eid] = toep_mat
full_toep.append(toep_mat)
# Concatenate toeplitz
full_toep = np.concatenate(full_toep)
# least square estimation
predictor = np.dot(linalg.pinv(np.dot(full_toep, full_toep.T)), full_toep)
all_evokeds = np.dot(predictor, data.T)
all_evokeds = np.vsplit(all_evokeds, len(event_id))
# parse evoked response
evoked_data = dict()
for idx, eid in enumerate(event_id):
evoked_data[eid] = all_evokeds[idx].T
return evoked_data, toeplitz_mat