def make_node(self, frames, n, axis):
"""
Compute an n-point fft of frames along given axis.
"""
_frames = tensor.as_tensor(frames, ndim=2)
_n = tensor.as_tensor(n, ndim=0)
_axis = tensor.as_tensor(axis, ndim=0)
if self.half and _frames.type.dtype.startswith('complex'):
raise TypeError('Argument to HalfFFT must not be complex', frames)
spectrogram = tensor.zmatrix()
buf = generic()
# The `buf` output is present for future work
# when we call FFTW directly and re-use the 'plan' that FFTW creates.
# In that case, buf would store a CObject encapsulating the plan.
rval = Apply(self, [_frames, _n, _axis], [spectrogram, buf])
return rval
python类fft()的实例源码
def perform(self, node, inp, out):
frames, n, axis = inp
spectrogram, buf = out
if self.inverse:
fft_fn = numpy.fft.ifft
else:
fft_fn = numpy.fft.fft
fft = fft_fn(frames, int(n), int(axis))
if self.half:
M, N = fft.shape
if axis == 0:
if (M % 2):
raise ValueError(
'halfFFT on odd-length vectors is undefined')
spectrogram[0] = fft[0:M / 2, :]
elif axis == 1:
if (N % 2):
raise ValueError(
'halfFFT on odd-length vectors is undefined')
spectrogram[0] = fft[:, 0:N / 2]
else:
raise NotImplementedError()
else:
spectrogram[0] = fft
def spike_lfp_filters(spikes,lfp):
'''
TODO: documentation
Cross-spectral densities between spikes and LFP
NTrials,NNeurons,NSamples = np.shape(spikes)
Parameters
----------
Returns
-------
'''
NTrials,NNeurons,NSamples = np.shape(spikes)
# precomute lfp fft
window = hanning(NSamples)
lfpmean = np.mean(lfp)
spikemean = np.mean(spikes,(0,2))
fftlfp = [fft((lfp[t]-np.mean(lfp[t]))*window) for t in np.arange(NTrials)]
result = np.zeros((NSamples,NNeurons),dtype=np.complex128)
for i in np.arange(NNeurons):
cspectra = [np.conj(fft((spikes[t,i,:]-np.mean(spikes[t,i,:]))*window))*fftlfp[t] for t in np.arange(NTrials)]
result[:,i] = np.mean(cspectra,0)
return result
def spectreconstruct(k,B,spikes=None,fftspikes=None):
'''
TODO: documentation
Reconstructs LFP from spikes using cross-spectral matrix.
Can optionally pass the fts if they are already available
NTrials,NNeurons,NSamples = np.shape(spikes)
Parameters
----------
Returns
-------
'''
if spikes!=None:
NTrials,NNeurons,NSamples = np.shape(spikes)
else:
NTrials,NNeurons,NSamples = np.shape(fftspikes)
if ffts==None:
assert spikes!=None
fftspikes = np.array([[fft(trial[i]-np.mean(trial[i])) for i in np.arange(NNeurons)] for trial in spikes])
result = [ifft(sum(fftspikes[t]*B.T,0)) for t in np.arange(NTrials)]
return result
def diffusion_basis(N=range(1,6),t=np.arange(100)):
'''
Note: conceptually similar to other basis functions in this file
with base=2 and offset=1
repeatly convolves exponential with itself to generate basis
Parameters
----------
Returns
-------
'''
print('THIS IS BAD')
assert 0
normalize = lambda x:x/np.sum(x)
first = np.fft(np.exp(-t))
kernels = [normalize(np.real(np.ifft(first**(2**(1+(n-1)*0.5))))) for n in N]
return np.array(kernels)
def delta(N):
'''
Parameters
----------
Returns
-------
'''
# Get discrete delta but make it even
delta = np.zeros((N,))
if (N%2==1):
delta[N//2]=1
else:
delta[N//2+1]=delta[N//2]=0.5
x = np.fft(delta)
return x
def fft(a, b=None, axis=0, threads=1, **kw):
if b is None:
return numpy.fft.fft(a, axis=axis)
else:
b[:] = numpy.fft.fft(a, axis=axis)
return b
def ifft(a, b=None, axis=0, threads=1, **kw):
if b is None:
return numpy.fft.ifft(a, axis=axis)
else:
b[:] = numpy.fft.ifft(a, axis=axis)
return b
def rfft(a, b=None, axis=0, threads=1, **kw):
if b is None:
return numpy.fft.rfft(a, axis=axis)
else:
b[:] = numpy.fft.rfft(a, axis=axis)
return b
def irfft(a, b=None, axis=0, threads=1, **kw):
if b is None:
return numpy.fft.irfft(a, axis=axis)
else:
b[:] = numpy.fft.irfft(a, axis=axis)
return b
def fft2(a, b=None, axes=(0, 1), threads=1, **kw):
if b is None:
return numpy.fft.fft2(a, axes=axes)
else:
b[:] = numpy.fft.fft2(a, axes=axes)
return b
def rfft2(a, b=None, axes=(0, 1), threads=1, **kw):
if b is None:
return numpy.fft.rfft2(a, axes=axes)
else:
b[:] = numpy.fft.rfft2(a, axes=axes)
return b
def irfft2(a, b=None, axes=(0, 1), threads=1, **kw):
if b is None:
return numpy.fft.irfft2(a, axes=axes)
else:
b[:] = numpy.fft.irfft2(a, axes=axes)
return b
def fftn(a, b=None, axes=(0, 1, 2), threads=1, **kw):
if b is None:
return numpy.fft.fftn(a, axes=axes)
else:
b[:] = numpy.fft.fftn(a, axes=axes)
return b
def ifftn(a, b=None, axes=(0, 1, 2), threads=1, **kw):
if b is None:
return numpy.fft.ifftn(a, axes=axes)
else:
b[:] = numpy.fft.ifftn(a, axes=axes)
return b
def rfftn(a, b=None, axes=(0, 1, 2), threads=1, **kw):
if b is None:
return numpy.fft.rfftn(a, axes=axes)
else:
b[:] = numpy.fft.rfftn(a, axes=axes)
return b
def _xx2k(self ):
"""
Private: oversampled FFT on the heterogeneous device
Firstly, zeroing the self.k_Kd array
Second, copy self.x_Nd array to self.k_Kd array by cSelect
Third: inplace FFT
"""
self.cMultiplyScalar(self.zero_scalar, self.k_Kd, local_size=None, global_size=int(self.Kdprod))
self.cSelect(self.NdGPUorder, self.KdGPUorder, self.x_Nd, self.k_Kd, local_size=None, global_size=int(self.Ndprod))
self.fft( self.k_Kd,self.k_Kd,inverse=False)
self.thr.synchronize()
def _k2xx(self):
"""
Private: the inverse FFT and image cropping (which is the reverse of _xx2k() method)
"""
self.fft( self.k_Kd2, self.k_Kd2,inverse=True)
# self.x_Nd._zero_fill()
self.cMultiplyScalar(self.zero_scalar, self.x_Nd, local_size=None, global_size=int(self.Ndprod ))
# self.cSelect(self.queue, (self.Ndprod,), None, self.KdGPUorder.data, self.NdGPUorder.data, self.k_Kd2.data, self.x_Nd.data )
self.cSelect( self.KdGPUorder, self.NdGPUorder, self.k_Kd2, self.x_Nd, local_size=None, global_size=int(self.Ndprod ))
self.thr.synchronize()
def xx2k(self, xx):
'''
fft of the image
input:
xx: scaled 2D image
output:
k: k-space grid
'''
# dd = numpy.size(self.st['Kd'])
output_x = numpy.zeros(self.st['Kd'], dtype=self.dtype,order='C')
# output_x[crop_slice_ind(xx.shape)] = xx
output_x.flat[self.KdCPUorder]=xx.flat[self.NdCPUorder]
k = numpy.fft.fftn(output_x, self.st['Kd'], range(0, self.ndims))
return k
def k2xx(self, k):
'''
Transform regular k-space (Kd array) to scaled image xx (Nd array)
'''
# dd = numpy.size(self.st['Kd'])
k = numpy.fft.ifftn(k, self.st['Kd'], range(0, self.ndims))
xx= numpy.zeros(self.st['Nd'],dtype=dtype, order='C')
xx.flat[self.NdCPUorder]=k.flat[self.KdCPUorder]
# xx = xx[crop_slice_ind(self.st['Nd'])]
return xx
def xx2k(self, xx):
"""
Private: oversampled FFT on CPU
Firstly, zeroing the self.k_Kd array
Second, copy self.x_Nd array to self.k_Kd array by cSelect
Third: inplace FFT
"""
# dd = numpy.size(self.Kd)
output_x = numpy.zeros(self.Kd, dtype=self.dtype,order='C')
# output_x[crop_slice_ind(xx.shape)] = xx
output_x.flat[self.KdCPUorder]=xx.flat[self.NdCPUorder]
k = numpy.fft.fftn(output_x, self.Kd, range(0, self.ndims))
return k
def _xx2k(self ):
"""
Private: oversampled FFT on the heterogeneous device
Firstly, zeroing the self.k_Kd array
Second, copy self.x_Nd array to self.k_Kd array by cSelect
Third: inplace FFT
"""
self.cMultiplyScalar(self.zero_scalar, self.k_Kd, local_size=None, global_size=int(self.Kdprod))
self.cSelect(self.NdGPUorder, self.KdGPUorder, self.x_Nd, self.k_Kd, local_size=None, global_size=int(self.Ndprod))
self.fft( self.k_Kd,self.k_Kd,inverse=False)
self.thr.synchronize()
def _k2xx(self):
"""
Private: the inverse FFT and image cropping (which is the reverse of _xx2k() method)
"""
self.fft( self.k_Kd2, self.k_Kd2,inverse=True)
# self.x_Nd._zero_fill()
self.cMultiplyScalar(self.zero_scalar, self.x_Nd, local_size=None, global_size=int(self.Ndprod ))
# self.cSelect(self.queue, (self.Ndprod,), None, self.KdGPUorder.data, self.NdGPUorder.data, self.k_Kd2.data, self.x_Nd.data )
self.cSelect( self.KdGPUorder, self.NdGPUorder, self.k_Kd2, self.x_Nd, local_size=None, global_size=int(self.Ndprod ))
self.thr.synchronize()
def fftpack_lite_rfftb(buf, s, temp_buf=[()]):
n = len(buf)
m = (n - 1) * 2
temp = temp_buf[0]
if m >= len(temp): temp_buf[0] = temp = numpy.empty(m * 2, buf.dtype)
numpy.divide(buf, m, temp[0:n])
temp[n:m] = 0
result = numpy.fft.fftpack_lite.rfftb(temp[0:m], s)
return result
def psf(aperture, wavefront, overfill=1):
"""
Transform an aperture and the wavefront to a PSF
Args:
aperture (TYPE): aperture
wavefront (TYPE): wavefront
overfill (int, optional): number of extra pixels
Returns:
real: PSF
"""
npix = len(wavefront)
nbig = npix*overfill
wfbig = numpy.zeros((nbig,nbig),dtype='d')
half = (nbig - npix)/2
wfbig[half:half+npix,half:half+npix] = wavefront
illum = numpy.zeros((nbig,nbig),dtype='d')
illum[half:half+npix,half:half+npix] = aperture
phase = numpy.exp(wfbig*(0.+1.j))
input = illum*phase
ft = numpy.fft.fft2(input)
powft = numpy.real(numpy.conj(ft)*ft)
sorted = numpy.zeros((nbig,nbig),dtype='d')
sorted[:nbig/2,:nbig/2] = powft[nbig/2:,nbig/2:]
sorted[:nbig/2,nbig/2:] = powft[nbig/2:,:nbig/2]
sorted[nbig/2:,:nbig/2] = powft[:nbig/2,nbig/2:]
sorted[nbig/2:,nbig/2:] = powft[:nbig/2,:nbig/2]
crop = sorted[half:half+npix,half:half+npix]
fluxrat = numpy.sum(crop)/numpy.sum(sorted)
# print("Cropped PSF has %.2f%% of the flux" % (100*fluxrat))
return crop
def test_lib(self):
from pyculib.fft.binding import libcufft
cufft = libcufft()
self.assertNotEqual(libcufft().version, 0)
def test_plan1d(self):
from pyculib.fft.binding import Plan, CUFFT_C2C
n = 10
data = np.arange(n, dtype=np.complex64)
orig = data.copy()
d_data = cuda.to_device(data)
fftplan = Plan.one(CUFFT_C2C, n)
fftplan.forward(d_data, d_data)
fftplan.inverse(d_data, d_data)
d_data.copy_to_host(data)
result = data / n
self.assertTrue(np.allclose(orig, result.real))
def test_plan2d(self):
from pyculib.fft.binding import Plan, CUFFT_C2C
n = 2**4
data = np.arange(n, dtype=np.complex64).reshape(2, n//2)
orig = data.copy()
d_data = cuda.to_device(data)
fftplan = Plan.two(CUFFT_C2C, *data.shape)
fftplan.forward(d_data, d_data)
fftplan.inverse(d_data, d_data)
d_data.copy_to_host(data)
result = data / n
self.assertTrue(np.allclose(orig, result.real))
def test_plan3d(self):
from pyculib.fft.binding import Plan, CUFFT_C2C
n = 32
data = np.arange(n, dtype=np.complex64).reshape(2, 2, 8)
orig = data.copy()
d_data = cuda.to_device(data)
fftplan = Plan.three(CUFFT_C2C, *data.shape)
fftplan.forward(d_data, d_data)
fftplan.inverse(d_data, d_data)
d_data.copy_to_host(data)
result = data / n
self.assertTrue(np.allclose(orig, result.real))
def test_against_fft_1d(self):
from pyculib.fft.binding import Plan, CUFFT_R2C
N = 128
x = np.asarray(np.arange(N), dtype=np.float32)
xf = np.fft.fft(x)
d_x_gpu = cuda.to_device(x)
xf_gpu = np.zeros(N//2+1, np.complex64)
d_xf_gpu = cuda.to_device(xf_gpu)
plan = Plan.many(x.shape, CUFFT_R2C)
plan.forward(d_x_gpu, d_xf_gpu)
d_xf_gpu.copy_to_host(xf_gpu)
self.assertTrue( np.allclose(xf[0:N//2+1], xf_gpu,
atol=1e-6) )