def make_poissonian_fit(self, x_axis, data, estimator, units=None, add_params=None):
""" Performe a poissonian fit on the provided data.
@param numpy.array x_axis: 1D axis values
@param numpy.array data: 1D data, should have the same dimension as x_axis.
@param method estimator: Pointer to the estimator method
@param list units: List containing the ['horizontal', 'vertical'] units as strings
@param Parameters or dict add_params: optional, additional parameters of
type lmfit.parameter.Parameters, OrderedDict or dict for the fit
which will be used instead of the values from the estimator.
@return object result: lmfit.model.ModelFit object, all parameters
provided about the fitting, like: success,
initial fitting values, best fitting values, data
with best fit with given axis,...
"""
poissonian_model, params = self.make_poissonian_model()
error, params = estimator(x_axis, data, params)
params = self._substitute_params(initial_params=params,
update_params=add_params)
try:
result = poissonian_model.fit(data, x=x_axis, params=params)
except:
self.log.warning('The poissonian fit did not work. Check if a poisson '
'distribution is needed or a normal approximation can be'
'used. For values above 10 a normal/ gaussian distribution '
'is a good approximation.')
result = poissonian_model.fit(data, x=x_axis, params=params)
print(result.message)
if units is None:
units = ['arb. unit', 'arb. unit']
result_str_dict = dict() # create result string for gui oder OrderedDict()
result_str_dict['Amplitude'] = {'value': result.params['amplitude'].value,
'error': result.params['amplitude'].stderr,
'unit': units[1]} # Amplitude
result_str_dict['Event rate'] = {'value': result.params['mu'].value,
'error': result.params['mu'].stderr,
'unit': units[0]} # event rate
result.result_str_dict = result_str_dict
return result
python类gaussian()的实例源码
def make_poissoniandouble_fit(self, x_axis, data, estimator, units=None, add_params=None):
""" Perform a double poissonian fit on the provided data.
@param numpy.array x_axis: 1D axis values
@param numpy.array data: 1D data, should have the same dimension as x_axis.
@param method estimator: Pointer to the estimator method
@param list units: List containing the ['horizontal', 'vertical'] units as strings
@param Parameters or dict add_params: optional, additional parameters of
type lmfit.parameter.Parameters, OrderedDict or dict for the fit
which will be used instead of the values from the estimator.
@return object result: lmfit.model.ModelFit object, all parameters
provided about the fitting, like: success,
initial fitting values, best fitting values, data
with best fit with given axis,...
"""
double_poissonian_model, params = self.make_poissoniandouble_model()
error, params = estimator(x_axis, data, params)
params = self._substitute_params(initial_params=params,
update_params=add_params)
try:
result = double_poissonian_model.fit(data, x=x_axis, params=params)
except:
self.log.warning('The double poissonian fit did not work. Check if a '
'poisson distribution is needed or a normal '
'approximation can be used. For values above 10 a '
'normal/ gaussian distribution is a good '
'approximation.')
result = double_poissonian_model.fit(data, x=x_axis, params=params)
# Write the parameters to allow human-readable output to be generated
result_str_dict = OrderedDict()
if units is None:
units = ["arb. units", 'arb. unit']
result_str_dict['Amplitude 1'] = {'value': result.params['p0_amplitude'].value,
'error': result.params['p0_amplitude'].stderr,
'unit': units[0]}
result_str_dict['Event rate 1'] = {'value': result.params['p0_mu'].value,
'error': result.params['p0_mu'].stderr,
'unit': units[1]}
result_str_dict['Amplitude 2'] = {'value': result.params['p1_amplitude'].value,
'error': result.params['p1_amplitude'].stderr,
'unit': units[0]}
result_str_dict['Event rate 2'] = {'value': result.params['p1_mu'].value,
'error': result.params['p1_mu'].stderr,
'unit': units[1]}
result.result_str_dict = result_str_dict
return result
def gaussian_twoD_testing():
""" Implement and check the estimator for a 2D gaussian fit. """
data = np.empty((121,1))
amplitude=np.random.normal(3e5,1e5)
center_x=91+np.random.normal(0,0.8)
center_y=14+np.random.normal(0,0.8)
sigma_x=np.random.normal(0.7,0.2)
sigma_y=np.random.normal(0.7,0.2)
offset=0
x = np.linspace(90,92,11)
y = np.linspace(13,15,12)
xx, yy = np.meshgrid(x, y)
axes=(xx.flatten(), yy.flatten())
theta_here=10./360.*(2*np.pi)
# data=qudi_fitting.twoD_gaussian_function((xx,yy),*(amplitude,center_x,center_y,sigma_x,sigma_y,theta_here,offset))
gmod,params = qudi_fitting.make_twoDgaussian_model()
data = gmod.eval(x=axes, amplitude=amplitude, center_x=center_x,
center_y=center_y, sigma_x=sigma_x, sigma_y=sigma_y,
theta=theta_here, offset=offset)
data += 50000*np.random.random_sample(np.shape(data))
gmod, params = qudi_fitting.make_twoDgaussian_model()
para=Parameters()
# para.add('theta',vary=False)
# para.add('center_x',expr='0.5*center_y')
# para.add('sigma_x',min=0.2*((92.-90.)/11.), max= 10*(x[-1]-y[0]) )
# para.add('sigma_y',min=0.2*((15.-13.)/12.), max= 10*(y[-1]-y[0]))
# para.add('center_x',value=40,min=50,max=100)
result = qudi_fitting.make_twoDgaussian_fit(xy_axes=axes, data=data)#,add_parameters=para)
#
# FIXME: What does "Tolerance seems to be too small." mean in message?
# print(result.message)
plt.close('all')
fig, ax = plt.subplots(1, 1)
ax.hold(True)
ax.imshow(result.data.reshape(len(y),len(x)),
cmap=plt.cm.jet, origin='bottom', extent=(x.min(), x.max(),
y.min(), y.max()),interpolation="nearest")
ax.contour(x, y, result.best_fit.reshape(len(y),len(x)), 8
, colors='w')
plt.show()
# plt.close('all')
print(result.fit_report())
# print('Message:',result.message)
def tuning_curve_2d(position, spikes, xedges, yedges, occupied_thresh=0, gaussian_sigma=None):
"""Creates 2D tuning curves based on spikes and 2D position.
Parameters
----------
position : nept.Position
Must be a 2D position.
spikes : list
Containing nept.SpikeTrain for each neuron.
xedges : np.array
yedges : np.array
sampling_rate : float
occupied_thresh: float
gaussian_sigma : float
Sigma used in gaussian filter if filtering.
Returns
-------
tuning_curves : np.array
Where each inner array is the tuning curve for an individual neuron.
"""
sampling_rate = np.median(np.diff(position.time))
position_2d, pos_xedges, pos_yedges = np.histogram2d(position.y, position.x, bins=[yedges, xedges])
position_2d *= sampling_rate
shape = position_2d.shape
occupied_idx = position_2d > occupied_thresh
tc = []
for spiketrain in spikes:
spikes_x = []
spikes_y = []
for spike_time in spiketrain.time:
spike_idx = find_nearest_idx(position.time, spike_time)
if np.abs(position.time[spike_idx] - spike_time) < sampling_rate:
spikes_x.append(position.x[spike_idx])
spikes_y.append(position.y[spike_idx])
spikes_2d, spikes_xedges, spikes_yedges = np.histogram2d(spikes_y, spikes_x, bins=[yedges, xedges])
firing_rate = np.zeros(shape)
firing_rate[occupied_idx] = spikes_2d[occupied_idx] / position_2d[occupied_idx]
tc.append(firing_rate)
if gaussian_sigma is not None:
tuning_curves = []
for firing_rate in tc:
tuning_curves.append(gaussian_filter(firing_rate, gaussian_sigma))
else:
print('Tuning curves with no filter.')
tuning_curves = tc
return np.array(tuning_curves)
def bin_spikes(spikes, position, window_size, window_advance,
gaussian_std=None, n_gaussian_std=5, normalized=True):
"""Bins spikes using a sliding window.
Parameters
----------
spikes: list
Of nept.SpikeTrain
position: nept.AnalogSignal
window_size: float
window_advance: float
gaussian_std: float or None
n_gaussian_std: int
normalized: boolean
Returns
-------
binned_spikes: nept.AnalogSignal
"""
bin_edges = get_edges(position, window_advance, lastbin=True)
given_n_bins = window_size / window_advance
n_bins = int(round(given_n_bins))
if abs(n_bins - given_n_bins) > 0.01:
warnings.warn("window advance does not divide the window size evenly. "
"Using window size %g instead." % (n_bins*window_advance))
if normalized:
square_filter = np.ones(n_bins) * (1 / n_bins)
else:
square_filter = np.ones(n_bins)
counts = np.zeros((len(spikes), len(bin_edges)))
for idx, spiketrain in enumerate(spikes):
counts[idx] = np.convolve(np.hstack([np.histogram(spiketrain.time, bins=bin_edges)[0],
np.array([0])]),
square_filter, mode='same')
if gaussian_std is not None and gaussian_std > window_advance:
n_points = n_gaussian_std * gaussian_std * 2 / window_advance
n_points = max(n_points, 1.0)
if n_points % 2 == 0:
n_points += 1
n_points = int(round(n_points))
gaussian_filter = signal.gaussian(n_points, gaussian_std / window_advance)
gaussian_filter /= np.sum(gaussian_filter)
if len(gaussian_filter) < counts.shape[1]:
for idx, spiketrain in enumerate(spikes):
counts[idx] = np.convolve(counts[idx], gaussian_filter, mode='same')
else:
raise ValueError("gaussian filter too long for this length of time")
return nept.AnalogSignal(counts.T, bin_edges)
def open_files(data_path, event_file, db_path):
# Read all data
st = read(data_path)
# Fill stream with station coordinates (in SAC header)
st.get_station_coordinates()
# Read event file
try:
cat = obspy.read_events(event_file)
if len(cat) > 1:
msg = 'File %s contains more than one event. Dont know, which one\
to chose. Please provide QuakeML file with just one event.'
raise TypeError(msg)
event = cat[0]
except:
event = get_event_from_obspydmt(event_file)
origin = event.origins[0]
db = instaseis.open_db(db_path)
# Initialize with MT from event file
try:
tensor = event.focal_mechanisms[0].moment_tensor.tensor
except IndexError:
print('No moment tensor present, using explosion. Hilarity may ensue')
tensor = obspy.core.event.Tensor(m_rr=1e20, m_tt=1e20, m_pp=1e20,
m_rp=0.0, m_rt=0.0, m_tp=0.0)
# Init with Gaussian STF with a length T:
# log10 T propto 0.5*Magnitude
# Scaling is such that the 5.7 Virginia event takes 5 seconds
if len(event.magnitudes) > 0:
duration = 10 ** (0.5 * (event.magnitudes[0].mag / 5.7)) * 5.0 / 2
else:
duration = 2.5
print('Assuming duration of %8.1f sec' % duration)
stf = signal.gaussian(duration * 2, duration / 4 / db.info.dt)
return db, st, origin, tensor, stf