def __in_select_line(self,temp_spike,pos_1,pos_2):
pos_3y = temp_spike[:-1]
pos_3x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(pos_3y.shape[0])
pos_4y = temp_spike[1:]
pos_4x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(1,pos_3y.shape[0]+1)
pos_3_4 = np.vstack([ pos_3x,pos_3y,pos_4x,pos_4y]).T
is_insect = np.apply_along_axis(self.__intersect,1,pos_3_4,pos_1,pos_2)
return np.any(is_insect)
python类apply_along_axis()的实例源码
def __indexs_select_pk0(self,pk0_roi0_h0,pk0_roi0_h1,pk0_roi1_h0,pk0_roi1_h1):
# get indexs of selected waveforms in pk0
spk_in_line = np.apply_along_axis(self.__in_select_line,1,self.waveforms_pk0,pk0_roi0_h0,pk0_roi0_h1)
changed_index = np.where(spk_in_line==True)[0]
changed_index = np.array(changed_index,dtype=np.int32)
spk_in_line1 = np.apply_along_axis(self.__in_select_line,1,self.waveforms_pk0,pk0_roi1_h0,pk0_roi1_h1)
changed_index1 = np.where(spk_in_line1==True)[0]
changed_index1 = np.array(changed_index1,dtype=np.int32)
changed_index = np.intersect1d(changed_index, changed_index1)
return changed_index + self.indexs_pk0[0]
def __in_select_line(self,temp_spike,pos_1,pos_2):
pos_3y = temp_spike[:-1]
pos_3x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(pos_3y.shape[0])
pos_4y = temp_spike[1:]
pos_4x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(1,pos_3y.shape[0]+1)
pos_3_4 = np.vstack([ pos_3x,pos_3y,pos_4x,pos_4y]).T
is_insect = np.apply_along_axis(self.__intersect,1,pos_3_4,pos_1,pos_2)
return np.any(is_insect)
def __indexs_select_pk2(self,pk2_roi_pos):
x_min = pk2_roi_pos[:,0].min()
x_max = pk2_roi_pos[:,0].max()
y_min = pk2_roi_pos[:,1].min()
y_max = pk2_roi_pos[:,1].max()
pca_1,pca_2 = self.PCAusedList.currentText().split("-")
pca_1 = np.int(pca_1)-1
pca_2 = np.int(pca_2)-1
x = np.logical_and(self.wavePCAs[:,pca_1]>x_min, \
self.wavePCAs[:,pca_1]<x_max)
y = np.logical_and(self.wavePCAs[:,pca_2]>y_min, \
self.wavePCAs[:,pca_2]<y_max)
ind_0 = np.logical_and(x, y)
ind_0 = np.where(ind_0 == True)[0]
ind_0 = np.array(ind_0,dtype=np.int32)
if ind_0.shape[0]>0:
segments = []
for i in range(pk2_roi_pos.shape[0]-1):
segments.append([pk2_roi_pos[i],pk2_roi_pos[i+1]])
segments.append([pk2_roi_pos[-1],pk2_roi_pos[0]])
segments = np.array(segments)
temp_pcas = self.wavePCAs[ind_0]
temp_pcas = temp_pcas[:,[pca_1,pca_2]]
is_intersect = np.apply_along_axis(self.__intersect_roi2,1,temp_pcas,segments,pca_1)
return ind_0[is_intersect]
else:
return np.array([],dtype=np.int32)
def __detect_now(self,spike_waveforms,selectChan,current_page):
if selectChan+"_"+str(current_page) in self.windowsState:
use_shape0 = self.__pk0_roi0_pos(selectChan,current_page)
spk_in_line = np.apply_along_axis(self.__in_select_line,1,spike_waveforms,use_shape0[0],use_shape0[1])
use_shape1 = self.__pk0_roi1_pos(selectChan,current_page)
spk_in_line1 = np.apply_along_axis(self.__in_select_line,1,spike_waveforms,use_shape1[0],use_shape1[1])
detected_mask = spk_in_line & spk_in_line1
else:
detected_mask = np.ones(spike_waveforms.shape[0],dtype=bool)
return detected_mask
# check whether a spike's waveform is intersect with segment widget
def __in_select_line(self,temp_spike,pos_1,pos_2):
pos_3y = temp_spike[:-1]
pos_3x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(pos_3y.shape[0])
pos_4y = temp_spike[1:]
pos_4x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(1,pos_3y.shape[0]+1)
pos_3_4 = np.vstack([ pos_3x,pos_3y,pos_4x,pos_4y]).T
is_insect = np.apply_along_axis(self.__intersect,1,pos_3_4,pos_1,pos_2)
return np.any(is_insect)
def __indexs_select_pk0(self,pk0_roi0_h0,pk0_roi0_h1,pk0_roi1_h0,pk0_roi1_h1):
# get indexs of selected waveforms in pk0
spk_in_line = np.apply_along_axis(self.__in_select_line,1,self.waveforms_pk0,pk0_roi0_h0,pk0_roi0_h1)
changed_index = np.where(spk_in_line==True)[0]
changed_index = np.array(changed_index,dtype=np.int32)
spk_in_line1 = np.apply_along_axis(self.__in_select_line,1,self.waveforms_pk0,pk0_roi1_h0,pk0_roi1_h1)
changed_index1 = np.where(spk_in_line1==True)[0]
changed_index1 = np.array(changed_index1,dtype=np.int32)
changed_index = np.intersect1d(changed_index, changed_index1)
return changed_index + self.indexs_pk0[0]
def __in_select_line(self,temp_spike,pos_1,pos_2):
pos_3y = temp_spike[:-1]
pos_3x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(pos_3y.shape[0])
pos_4y = temp_spike[1:]
pos_4x = np.ones(pos_3y.shape,dtype=np.int32)*np.arange(1,pos_3y.shape[0]+1)
pos_3_4 = np.vstack([ pos_3x,pos_3y,pos_4x,pos_4y]).T
is_insect = np.apply_along_axis(self.__intersect,1,pos_3_4,pos_1,pos_2)
return np.any(is_insect)
def __indexs_select_pk2(self,pk2_roi_pos):
x_min = pk2_roi_pos[:,0].min()
x_max = pk2_roi_pos[:,0].max()
y_min = pk2_roi_pos[:,1].min()
y_max = pk2_roi_pos[:,1].max()
pca_1,pca_2 = self.PCAusedList.currentText().split("-")
pca_1 = np.int(pca_1)-1
pca_2 = np.int(pca_2)-1
x = np.logical_and(self.wavePCAs[:,pca_1]>x_min, \
self.wavePCAs[:,pca_1]<x_max)
y = np.logical_and(self.wavePCAs[:,pca_2]>y_min, \
self.wavePCAs[:,pca_2]<y_max)
ind_0 = np.logical_and(x, y)
ind_0 = np.where(ind_0 == True)[0]
ind_0 = np.array(ind_0,dtype=np.int32)
if ind_0.shape[0]>0:
segments = []
for i in range(pk2_roi_pos.shape[0]-1):
segments.append([pk2_roi_pos[i],pk2_roi_pos[i+1]])
segments.append([pk2_roi_pos[-1],pk2_roi_pos[0]])
segments = np.array(segments)
temp_pcas = self.wavePCAs[ind_0]
temp_pcas = temp_pcas[:,[pca_1,pca_2]]
is_intersect = np.apply_along_axis(self.__intersect_roi2,1,temp_pcas,segments,pca_1)
return ind_0[is_intersect]
else:
return np.array([],dtype=np.int32)
def normalize_simple(matrix, mask):
"""Normalizes a matrix by columns, and then by rows. With multiple
time-series, the data are normalized to the within-series total, not the
entire data set total.
Parameters
----------
matrix: np.matrix
Time-series matrix of abundance counts. Rows are sequences, columns
are samples/time-points.
mask: list or np.array
List of objects with length matching the number of timepoints, where
unique values delineate multiple time-series. If there is only one
time-series in the data set, it's a list of identical objects.
Returns
-------
normal_matrix: np.matrix
Matrix where the columns (within-sample) have been converted to
proportions, then the rows are normalized to sum to 1.
"""
normal_matrix = matrix / matrix.sum(0)
normal_matrix[np.invert(np.isfinite(normal_matrix))] = 0
for mask_val in np.unique(mask):
y = normal_matrix[:, np.where(mask == mask_val)[0]]
y = np.apply_along_axis(zscore, 1, y)
normal_matrix[:, np.where(mask == mask_val)[0]] = y
del y
return normal_matrix
def feat_eeg(signals):
"""
calculate the relative power as defined by Leangkvist (2012),
assuming signal is recorded with 100hz
"""
if signals.ndim == 1: signals = np.expand_dims(signals,0)
sfreq = use_sfreq
nsamp = float(signals.shape[1])
feats = np.zeros((signals.shape[0],9),dtype='float32')
# 5 FEATURE for freq babnds
w = (fft(signals,axis=1)).real
delta = np.sum(np.abs(w[:,np.arange(0.5*nsamp/sfreq,4*nsamp/sfreq, dtype=int)]),axis=1)
theta = np.sum(np.abs(w[:,np.arange(4*nsamp/sfreq,8*nsamp/sfreq, dtype=int)]),axis=1)
alpha = np.sum(np.abs(w[:,np.arange(8*nsamp/sfreq,13*nsamp/sfreq, dtype=int)]),axis=1)
beta = np.sum(np.abs(w[:,np.arange(13*nsamp/sfreq,20*nsamp/sfreq, dtype=int)]),axis=1)
gamma = np.sum(np.abs(w[:,np.arange(20*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]),axis=1) # only until 50, because hz=100
spindle = np.sum(np.abs(w[:,np.arange(12*nsamp/sfreq,14*nsamp/sfreq, dtype=int)]),axis=1)
sum_abs_pow = delta + theta + alpha + beta + gamma + spindle
feats[:,0] = delta /sum_abs_pow
feats[:,1] = theta /sum_abs_pow
feats[:,2] = alpha /sum_abs_pow
feats[:,3] = beta /sum_abs_pow
feats[:,4] = gamma /sum_abs_pow
feats[:,5] = spindle /sum_abs_pow
feats[:,6] = np.log10(stats.kurtosis(signals, fisher=False, axis=1)) # kurtosis
feats[:,7] = np.log10(-np.sum([(x/nsamp)*(np.log(x/nsamp)) for x in np.apply_along_axis(lambda x: np.histogram(x, bins=8)[0], 1, signals)],axis=1)) # entropy.. yay, one line...
#feats[:,7] = np.polynomial.polynomial.polyfit(np.log(f[np.arange(0.5*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]), np.log(w[0,np.arange(0.5*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]),1)
feats[:,8] = np.dot(np.array([3.5,4,5,7,30]),feats[:,0:5].T ) / (sfreq/2-0.5)
if np.any(feats==np.nan): print('NaN detected')
return np.nan_to_num(feats)
def feat_wavelet(signals):
"""
calculate the relative power as defined by Leangkvist (2012),
assuming signal is recorded with 100hz
"""
if signals.ndim == 1: signals = np.expand_dims(signals,0)
sfreq = use_sfreq
nsamp = float(signals.shape[1])
feats = np.zeros((signals.shape[0],8),dtype='float32')
# 5 FEATURE for freq babnds
w = (fft(signals,axis=1)).real
delta = np.sum(np.abs(w[:,np.arange(0.5*nsamp/sfreq,4*nsamp/sfreq, dtype=int)]),axis=1)
theta = np.sum(np.abs(w[:,np.arange(4*nsamp/sfreq,8*nsamp/sfreq, dtype=int)]),axis=1)
alpha = np.sum(np.abs(w[:,np.arange(8*nsamp/sfreq,13*nsamp/sfreq, dtype=int)]),axis=1)
beta = np.sum(np.abs(w[:,np.arange(13*nsamp/sfreq,20*nsamp/sfreq, dtype=int)]),axis=1)
gamma = np.sum(np.abs(w[:,np.arange(20*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]),axis=1) # only until 50, because hz=100
sum_abs_pow = delta + theta + alpha + beta + gamma
feats[:,0] = delta /sum_abs_pow
feats[:,1] = theta /sum_abs_pow
feats[:,2] = alpha /sum_abs_pow
feats[:,3] = beta /sum_abs_pow
feats[:,4] = gamma /sum_abs_pow
feats[:,5] = np.log10(stats.kurtosis(signals,fisher=False,axis=1)) # kurtosis
feats[:,6] = np.log10(-np.sum([(x/nsamp)*(np.log(x/nsamp)) for x in np.apply_along_axis(lambda x: np.histogram(x, bins=8)[0], 1, signals)],axis=1)) # entropy.. yay, one line...
#feats[:,7] = np.polynomial.polynomial.polyfit(np.log(f[np.arange(0.5*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]), np.log(w[0,np.arange(0.5*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]),1)
feats[:,7] = np.dot(np.array([3.5,4,5,7,30]),feats[:,0:5].T ) / (sfreq/2-0.5)
if np.any(feats==np.nan): print('NaN detected')
return np.nan_to_num(feats)
def feat_eog(signals):
"""
calculate the EOG features
:param signals: 1D or 2D signals
"""
if signals.ndim == 1: signals = np.expand_dims(signals,0)
sfreq = use_sfreq
nsamp = float(signals.shape[1])
w = (fft(signals,axis=1)).real
feats = np.zeros((signals.shape[0],15),dtype='float32')
delta = np.sum(np.abs(w[:,np.arange(0.5*nsamp/sfreq,4*nsamp/sfreq, dtype=int)]),axis=1)
theta = np.sum(np.abs(w[:,np.arange(4*nsamp/sfreq,8*nsamp/sfreq, dtype=int)]),axis=1)
alpha = np.sum(np.abs(w[:,np.arange(8*nsamp/sfreq,13*nsamp/sfreq, dtype=int)]),axis=1)
beta = np.sum(np.abs(w[:,np.arange(13*nsamp/sfreq,20*nsamp/sfreq, dtype=int)]),axis=1)
gamma = np.sum(np.abs(w[:,np.arange(20*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]),axis=1) # only until 50, because hz=100
sum_abs_pow = delta + theta + alpha + beta + gamma
feats[:,0] = delta /sum_abs_pow
feats[:,1] = theta /sum_abs_pow
feats[:,2] = alpha /sum_abs_pow
feats[:,3] = beta /sum_abs_pow
feats[:,4] = gamma /sum_abs_pow
feats[:,5] = np.dot(np.array([3.5,4,5,7,30]),feats[:,0:5].T ) / (sfreq/2-0.5) #smean
feats[:,6] = np.sqrt(np.max(signals, axis=1)) #PAV
feats[:,7] = np.sqrt(np.abs(np.min(signals, axis=1))) #VAV
feats[:,8] = np.argmax(signals, axis=1)/nsamp #PAP
feats[:,9] = np.argmin(signals, axis=1)/nsamp #VAP
feats[:,10] = np.sqrt(np.sum(np.abs(signals), axis=1)/ np.mean(np.sum(np.abs(signals), axis=1))) # AUC
feats[:,11] = np.sum(((np.roll(np.sign(signals), 1,axis=1) - np.sign(signals)) != 0).astype(int),axis=1)/nsamp #TVC
feats[:,12] = np.log10(np.std(signals, axis=1)) #STD/VAR
feats[:,13] = np.log10(stats.kurtosis(signals,fisher=False,axis=1)) # kurtosis
feats[:,14] = np.log10(-np.sum([(x/nsamp)*((np.log((x+np.spacing(1))/nsamp))) for x in np.apply_along_axis(lambda x: np.histogram(x, bins=8)[0], 1, signals)],axis=1)) # entropy.. yay, one line...
if np.any(feats==np.nan): print('NaN detected')
return np.nan_to_num(feats)
def feat_emg(signals):
"""
calculate the EMG median as defined by Leangkvist (2012),
"""
if signals.ndim == 1: signals = np.expand_dims(signals,0)
sfreq = use_sfreq
nsamp = float(signals.shape[1])
w = (fft(signals,axis=1)).real
feats = np.zeros((signals.shape[0],13),dtype='float32')
delta = np.sum(np.abs(w[:,np.arange(0.5*nsamp/sfreq,4*nsamp/sfreq, dtype=int)]),axis=1)
theta = np.sum(np.abs(w[:,np.arange(4*nsamp/sfreq,8*nsamp/sfreq, dtype=int)]),axis=1)
alpha = np.sum(np.abs(w[:,np.arange(8*nsamp/sfreq,13*nsamp/sfreq, dtype=int)]),axis=1)
beta = np.sum(np.abs(w[:,np.arange(13*nsamp/sfreq,20*nsamp/sfreq, dtype=int)]),axis=1)
gamma = np.sum(np.abs(w[:,np.arange(20*nsamp/sfreq,50*nsamp/sfreq, dtype=int)]),axis=1) # only until 50, because hz=100
sum_abs_pow = delta + theta + alpha + beta + gamma
feats[:,0] = delta /sum_abs_pow
feats[:,1] = theta /sum_abs_pow
feats[:,2] = alpha /sum_abs_pow
feats[:,3] = beta /sum_abs_pow
feats[:,4] = gamma /sum_abs_pow
feats[:,5] = np.dot(np.array([3.5,4,5,7,30]),feats[:,0:5].T ) / (sfreq/2-0.5) #smean
emg = np.sum(np.abs(w[:,np.arange(12.5*nsamp/sfreq,32*nsamp/sfreq, dtype=int)]),axis=1)
feats[:,6] = emg / np.sum(np.abs(w[:,np.arange(8*nsamp/sfreq,32*nsamp/sfreq, dtype=int)]),axis=1) # ratio of high freq to total motor
feats[:,7] = np.median(np.abs(w[:,np.arange(8*nsamp/sfreq,32*nsamp/sfreq, dtype=int)]),axis=1) # median freq
feats[:,8] = np.mean(np.abs(w[:,np.arange(8*nsamp/sfreq,32*nsamp/sfreq, dtype=int)]),axis=1) # mean freq
feats[:,9] = np.std(signals, axis=1) # std
feats[:,10] = np.mean(signals,axis=1)
feats[:,11] = np.log10(stats.kurtosis(signals,fisher=False,axis=1) )
feats[:,12] = np.log10(-np.sum([(x/nsamp)*((np.log((x+np.spacing(1))/nsamp))) for x in np.apply_along_axis(lambda x: np.histogram(x, bins=8)[0], 1, signals)],axis=1)) # entropy.. yay, one line...
if np.any(feats==np.nan): print('NaN detected')
return np.nan_to_num(feats)
def jaccard(inclusion):
"""Calculate jaccard distances for a community."""
logger.info("calculating jaccard distance for {}x{} input matrix".format(
*inclusion.shape))
jaccard = np.apply_along_axis(
lambda a: (a & inclusion).sum(1), 1, inclusion)
jaccard = jaccard / np.apply_along_axis(
lambda a: (a | inclusion).sum(1), 1, inclusion)
return 1 - jaccard
def euclidean(inclusion):
"""Calculate euclidean distances for a community."""
logger.info("calculating euclidean distance for {}x{} input matrix".format(
*inclusion.shape))
euclidean = np.apply_along_axis(
lambda a: ((a - inclusion) ** 2).sum(1), 1, inclusion)
return np.sqrt(euclidean)
def calc_pairwise_cosine(model):
n = model.num_topics
weights = model.state.get_lambda()
weights = np.apply_along_axis(lambda x: x / x.sum(), 1, weights) # get dist.
weights = unitmatrix(weights) # normalize
score = []
for i in range(n):
for j in range(i + 1, n):
score.append(np.arccos(weights[i].dot(weights[j])))
return np.mean(score), np.std(score)
def calc_pairwise_dev(model):
# the average squared deviation from 0 (90 degree)
n = model.num_topics
weights = model.state.get_lambda()
weights = np.apply_along_axis(lambda x: x / x.sum(), 1, weights) # get dist.
weights = unitmatrix(weights) # normalize
score = 0.
for i in range(n):
for j in range(i + 1, n):
score += (weights[i].dot(weights[j]))**2
return np.sqrt(2. * score / n / (n - 1))
def decode(self, X, mode='argmax'):
if mode == 'argmax':
X = X.argmax(axis=-1)
elif mode == 'choice':
X = np.apply_along_axis(lambda vec: \
np.random.choice(len(vec), 1,
p=(vec / np.sum(vec))),
axis=-1, arr=X).ravel()
return str.join('',(self.indices_char[x] for x in X))
def _nanmedian_small(a, axis=None, out=None, overwrite_input=False):
"""
sort + indexing median, faster for small medians along multiple
dimensions due to the high overhead of apply_along_axis
see nanmedian for parameter usage
"""
a = np.ma.masked_array(a, np.isnan(a))
m = np.ma.median(a, axis=axis, overwrite_input=overwrite_input)
for i in range(np.count_nonzero(m.mask.ravel())):
warnings.warn("All-NaN slice encountered", RuntimeWarning)
if out is not None:
out[...] = m.filled(np.nan)
return out
return m.filled(np.nan)