def linreg(x,y,units='PW/Sv'):
""" Return linear regression model and plot label """
if len(x) > 1:
slope, intercept, r_value, p_value, std_err = stats.linregress(x,y)
y_model = x * slope + intercept
label = '(%5.3f %s)' % (slope, units)
else:
y_model = None
label = ''
return y_model, label
python类linregress()的实例源码
def Get_Summary_Stats(df):
slope, intercept, r_value, p_value, std_err = \
stats.linregress(df['act_ret_diff'], df['expec_ret_diff'])
r2_value = r_value**2
df_stats = pd.DataFrame({'slope':[slope], 'intercept':[intercept],
'r2_value':[r2_value], 'p_value':[p_value], 'std_err':[std_err]})
df_stats.to_csv(cfg.DATA_PATH + cfg.CLEAN_DATA_FILE + '_summary_stats.csv', index=False)
def get_slopes(X, y):
slope, __, __, __, __ = linregress(X, y)
return slope
def fit_line(self):
if self.received_data and self.xs.shape[0] > 0:
# fit line to euclidean space laser data in the window of interest
slope, intercept, r_val, p_val, std_err = stats.linregress(self.xs,self.ys)
self.m = slope
self.c = intercept
# window the data, compute the line fit and associated control
def getAlphaBeta(self, interval=100):
"""Formula: (cov(dailyROR, marketROR)/var(marketROR)) or linear-regression:intercept, slope"""
linreg = np.array([stats.linregress(self.marketROR[i:i+interval][:, 1].astype(float), self.dailyROR[i:i+interval][:, 1].astype(float)) for i in range(min(len(self.dailyROR), len(self.marketROR))-interval)])
Alpha = [(self.Date[i], linreg[i, 0]) for i in range(min(len(self.dailyROR), len(self.marketROR))-interval)]
Beta = [(self.Date[i], linreg[i, 1]) for i in range(min(len(self.dailyROR), len(self.marketROR))-interval)]
return np.array(Alpha), np.array(Beta)
def GetDeseasonalizedData(period, aodvalues, allLats, allLongs, months, meanAOD, rejectzeros=True, uprof=0):
mlist=[]
for m in months:
if int(m.split('-')[0]) in period:
mlist.append(m.split('-')[1]+m.split('-')[0])
mlist.sort()
data = np.zeros((len(allLats), len(allLongs), len(mlist)))
slopedata = np.zeros((len(allLats), len(allLongs)))
interceptdata = np.zeros((len(allLats), len(allLongs)))
for e in aodvalues:
i=allLats.index(e.latitude)
j=allLongs.index(e.longitude)
if not rejectzeros :
numcheck=isfloat(e.aod_12)
else :
numcheck=False
if isfloat(e.aod_12):
if float(e.aod_12)>0.0 :
numcheck=True
if e.month in period and numcheck and int(e.uprofiles)>=uprof:
k=mlist.index(str(e.year)+str(e.month).zfill(2))
if meanAOD[i][j]>0 :
data[i][j][k]=(float(e.aod_12)-meanAOD[i][j])/meanAOD[i][j]*100
x = np.arange(0,len(mlist))
for ind, e in np.ndenumerate(data[:,:,-1]) :
slope, intercept, r_value, p_value, std_err = linregress(x,data[ind[0]][ind[1]])
slopedata[ind[0]][ind[1]]=slope
interceptdata[ind[0]][ind[1]]=intercept
return slopedata, interceptdata, data
cloud_frequency.py 文件源码
项目:Cloud-variability-time-frequency
作者: florentbrient
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def slope_create(F0,F1):
# Calculate slopes (OLS)
slope, intercept, r_value, p_value, std_err = stats.linregress(F0,F1)
# Slope with robust regression
x = sm.add_constant(F0)
y = F1
rlm_results = sm.RLM(y,x, M=sm.robust.norms.HuberT()).fit()
slope_r = rlm_results.params[-1]
intercept_r = rlm_results.params[0]
del x,y,rlm_results
return slope, intercept, r_value, slope_r, intercept_r
def weighted_least_squares(X,Y,W):
'''
Initialize power law fit
EPS = 1e-10
use = (X>EPS)&(Y>EPS)
weighted_least_squares(np.log(X+EPS)[use],np.log(Y+EPS)[use],1/(EPS+X[use]))
Parameters
----------
X: List of distances
Y: List of amplitudes
W: Weights for points
Returns
-------
result : object
Optimization result returned by scipy.optimize.minimize. See
scipy.optimize documentation for details.
'''
X = np.float64(X)
Y = np.float64(Y)
def objective(ab):
a,b=(a,b)
return np.sum( W*(Y-(X*a+b))**2)
a,b,_,_,_ = linregress(X,Y)
result = minimize(objective,[a,b])
if not result.success:
print(result.message)
warnings.warn('Optimization failed: %s'%result.message)
return result
def fit_lm(y):
x = np.array(range(len(y)))
gradient, intercept, r_value, p_value, std_err = stats.linregress(x,y)
fit_line = [(x_val * gradient) + intercept for x_val in x]
return gradient, intercept, r_value, p_value, std_err, fit_line
def computeSlope(y):
"""compute slope of y best fit line to y under a linear regression"""
x = np.arange(len(y))
y -= np.mean(y)
slope, _, _, _, _ = stats.linregress(x, y)
return slope
def calculate_breaking_points(quant_list):
MAX_ERROR = 5
RANGE = 5
CENTER = 50
BRAKE_POINTS = dict()
# right to left window
for x_iter in range(100, CENTER, -1):
x_proj = range(x_iter - RANGE, x_iter)
# pylab.plot(x_proj, quant[x-RANGE:x], 'k',alpha=0.5)
y_subset = quant_list[x_iter - RANGE:x_iter]
slope, intercept, r_value, p_value, std_err = stats.linregress(x_proj, y_subset)
g, l = calculate_error(slope, intercept, x_proj, y_subset)
# print x-RANGE,x, slope, intercept, y[0], f(x, slope, intercept), l,r
if l > MAX_ERROR:
BRAKE_POINTS[x_iter - RANGE / 2] = {"error":l, "slope":slope, "offset":intercept}
# left to right window
for x_iter in range(0, CENTER, 1):
x_proj = range(x_iter, x_iter + RANGE)
# pylab.plot(x_proj, quant_list[x_iter:x_iter + RANGE], 'k', alpha=0.3)
y_subset = quant_list[x_iter:x_iter + RANGE]
slope, intercept, r_value, p_value, std_err = stats.linregress(x_proj, y_subset)
g, l = calculate_error(slope, intercept, x_proj, y_subset)
# print x,x+RANGE, slope, intercept, y[0], f(x, slope, intercept), l,r
if l > MAX_ERROR:
if ((x_iter - RANGE + x_iter) / 2) not in BRAKE_POINTS.keys():
BRAKE_POINTS[x_iter + (RANGE / 2)] = {"error":l, "slope":slope, "offset":intercept}
# pylab.plot([x_iter, x_iter + RANGE, ], [ f(x_iter, slope, intercept), f(x_iter + RANGE, slope, intercept)], "b", alpha=0.3)
return BRAKE_POINTS
def fit_exponential(series, *, p0=(1.0, 0.01, 0.0), n0: float = None):
"""
Fits an exponential to a series. First attempts an exponential fit in linear space using p0, then falls back to a
fit in log space to attempt to find parameters p0 for a linear fit; if all else fails returns the linear fit.
"""
if n0 is None:
fit_fn = exponential
else:
def exponential_constrain_n0(x, a, b):
return a * numpy.exp(b * x) + n0
fit_fn = exponential_constrain_n0
p0 = p0[:2]
try:
popt, pcov = curve_fit(fit_fn, series.index, series.values, p0=p0, maxfev=10000)
if n0 is not None:
popt = tuple(popt) + (n0,)
except RuntimeError:
pass
else:
slope = popt[1]
intercept = numpy.log(1 / popt[0]) / slope
N0 = popt[2]
if slope >= 0:
snr = signal_noise_ratio(series, *popt)
return slope, intercept, N0, snr, False
log_series = numpy.log(series[series > 0] - (n0 or 0.0))
slope, c, *__ = linregress(log_series.index, log_series.values)
intercept = - c / slope
if n0 is None:
p0 = (numpy.exp(c), slope, 0.0)
else:
p0 = (numpy.exp(c), slope)
try:
popt, pcov = curve_fit(fit_fn, series.index, series.values, p0=p0, maxfev=10000)
if n0 is not None:
popt = tuple(popt) + (n0,)
except RuntimeError:
snr = signal_noise_ratio(series, c, slope, 0.0)
return slope, intercept, (n0 or 0.0), snr, True
else:
slope = popt[1]
intercept = numpy.log(1 / popt[0]) / slope
n0 = popt[2]
snr = signal_noise_ratio(series, *popt)
return slope, intercept, n0, snr, False
def analyze_r01(state, human, time):
"""
Iteration 1
"""
#Read data
if human:
DATA_CSV = 'data/'+state+'_2a.csv'
else:
DATA_CSV = 'data/'+state+'_2b.csv'
data_points = pd.read_csv(DATA_CSV)
#Shift carbon and year one row back
nr1 = data_points['carb']
nr1 = nr1.iloc[1:len(nr1)]
nr2 = data_points['py']
nr2 = nr2.iloc[1:len(nr2)]
data_points = data_points.iloc[0:len(data_points.index)-1]
nr1.index = np.arange(len(nr1.index))
nr2.index = np.arange(len(nr2.index))
#Now we can calculate difference in carbon
if time:
data_points.loc[:, 'growth'] = (nr1 - data_points['carb']) / (nr2 - data_points['py'])
else:
data_points.loc[:, 'growth'] = nr1
data_points.loc[:, 'post_py'] = nr2
data_points = data_points[data_points.post_py // 10000 == data_points.py // 10000]
data_points.drop(['py', 'post_py'], axis=1, inplace=True)
data_points.index = np.arange(len(data_points.index))
data_points = data_points.loc[:, ['carb', 'growth']]
data_points = data_points.as_matrix().tolist()
#Split data into training and testing
random.shuffle(data_points)
training = data_points[0:len(data_points) / 2]
test = data_points[len(data_points) / 2:len(data_points)]
training = np.array(training)
#Create the linear regression function
m = stats.linregress(training).slope
n = stats.linregress(training).intercept
sq_error = 0.0
#Perform validation
for elem in test:
predicted = m * elem[0] + n
actual = elem[1]
sq_error += (actual - predicted) ** 2
mse = math.sqrt(sq_error/len(test))
return mse
def analyze_r02(state, human, time):
"""
Revision 2
"""
#Read data
if human:
DATA_CSV = 'data/'+state+'_2a.csv'
else:
DATA_CSV = 'data/'+state+'_2b.csv'
data_points = pd.read_csv(DATA_CSV)
#Shift carbon and year one row back
nr1 = data_points['carb']
nr1 = nr1.iloc[1:len(nr1)]
nr2 = data_points['py']
nr2 = nr2.iloc[1:len(nr2)]
data_points = data_points.iloc[0:len(data_points.index)-1]
nr1.index = np.arange(len(nr1.index))
nr2.index = np.arange(len(nr2.index))
#Now we can calculate difference in carbon
if time:
data_points.loc[:, 'growth'] = (nr1 - data_points['carb']) / (nr2 - data_points['py'])
else:
data_points.loc[:, 'growth'] = nr1
data_points.loc[:, 'post_py'] = nr2
data_points = data_points[data_points.post_py // 10000 == data_points.py // 10000]
data_points.drop(['py', 'post_py'], axis=1, inplace=True)
data_points.index = np.arange(len(data_points.index))
data_points = data_points.loc[:, ['carb', 'growth']]
data_points = data_points.as_matrix().tolist()
#Split data into 10 groups
N_GROUPS = 10
random.shuffle(data_points)
groups = []
prev_cutoff = 0
#Create the model while performing cross-validation
for i in np.arange(N_GROUPS):
next_cutoff = (i + 1) * len(data_points) / N_GROUPS
groups.append(data_points[prev_cutoff:next_cutoff])
prev_cutoff = next_cutoff
sum_mse = 0
for i in np.arange(N_GROUPS):
training = []
test = []
for j, group in enumerate(groups):
if j == i:
test = group
else:
training.extend(group)
training = np.array(training)
m = stats.linregress(training).slope
n = stats.linregress(training).intercept
sq_error = 0.0
for elem in test:
predicted = m * elem[0] + n
actual = elem[1]
sq_error += (actual - predicted) ** 2
mse = math.sqrt(sq_error/len(test))
sum_mse += mse
return sum_mse/N_GROUPS
def calcAFunc(self,MaggNow,AaggNow):
'''
Calculate a new aggregate savings rule based on the history
of the aggregate savings and aggregate market resources from a simulation.
Parameters
----------
MaggNow : [float]
List of the history of the simulated aggregate market resources for an economy.
AaggNow : [float]
List of the history of the simulated aggregate savings for an economy.
Returns
-------
(unnamed) : CapDynamicRule
Object containing a new savings rule
'''
verbose = True
discard_periods = 200 # Throw out the first T periods to allow the simulation to approach the SS
update_weight = 0.5 # Proportional weight to put on new function vs old function parameters
total_periods = len(MaggNow)
# Regress the log savings against log market resources
logAagg = np.log(AaggNow[discard_periods:total_periods])
logMagg = np.log(MaggNow[discard_periods-1:total_periods-1])
slope, intercept, r_value, p_value, std_err = stats.linregress(logMagg,logAagg)
# Make a new aggregate savings rule by combining the new regression parameters
# with the previous guess
intercept = update_weight*intercept + (1.0-update_weight)*self.intercept_prev
slope = update_weight*slope + (1.0-update_weight)*self.slope_prev
AFunc = AggregateSavingRule(intercept,slope) # Make a new next-period capital function
# Save the new values as "previous" values for the next iteration
self.intercept_prev = intercept
self.slope_prev = slope
# Plot aggregate resources vs aggregate savings for this run and print the new parameters
if verbose:
print('intercept=' + str(intercept) + ', slope=' + str(slope) + ', r-sq=' + str(r_value**2))
#plot_start = discard_periods
#plt.plot(logMagg[plot_start:],logAagg[plot_start:],'.k')
#plt.show()
return AggShocksDynamicRule(AFunc)
def log_linear_fitting(x, y, method='lsq'):
"""Function to perform log linear regression.
Parameters
----------
x : ndarray, shape (n_samples)
Corresponds to the x. In our case, it should be the time.
y : ndarray, shape (n_samples)
Corresponds to the y. In our case, it should be the rpp.
method : string, optional (default='lsq')
The method to use to perform the regression. The choices are:
- If 'lsq', an ordinary least-square approach is used.
- If 'lm', the Levenberg-Marquardt is used.
Returns
-------
slope : float
slope of the regression line.
intercept : float
intercept of the regression line.
std_err : float
Standard error of the estimate.
coeff_det : float
Coefficient of determination.
"""
# Check that the array x and y have the same size
if x.shape != y.shape:
raise ValueError('The size of x and y should be the same.')
if method == 'lsq':
# Perform the fitting using least-square
slope, intercept, _, _, _ = linregress(np.log(x), y)
std_err = res_std_dev(y, log_linear_model(x, slope, intercept))
coeff_det = r_squared(y, log_linear_model(x, slope, intercept))
elif method == 'lm':
# Perform the fitting using non-linear least-square
# Levenberg-Marquardt
popt, _ = curve_fit(linear_model, np.log(x), y)
slope = popt[0]
intercept = popt[1]
std_err = res_std_dev(y, log_linear_model(x, slope, intercept))
coeff_det = r_squared(y, log_linear_model(x, slope, intercept))
else:
raise NotImplementedError
return slope, intercept, std_err, coeff_det
def SymmetryTest(signal1, signal2, error, binary_names, name_signal1 = "", comment="ok"):
"""
From two signals, this function calculates the relative error at each time
indexe, and therefore returns the time indexes where anomalies are found.
Computes as well the linear regression coefficients.
Inputs :
- signal1, signal2 : two signals of the class SignalData to compare (generally signal1 -> "...CHA", and signal2 -> "...CHB" or "..AMSC1" and "..AMSC2" )
- error : the result will be False if there is at least one value which is out of the relative error box
- binary_names : list of binary files names
- [name_signal1] : optional, a string, the name of the first input.
Allows the algorithm to check if the inputs are boolean or not (from specifications)
By defaut, consider the signal as non boolean.
- [comment] : otpional, a string, if equals 'none' then don't print any result, prints the result if nothing is specified. The print is done through logger.info
Outputs :
- result : a bool, indicates if the two imput signals are the same (according to the accepted error)
- index : time indexes of the signal.data where the differences where found
- lin_reg : an array which contains first the type of the signals ('b' for boolean and 'c' for continuous), then the slope, the intercept and finally the R2 value of linear regression between the two signals.
"""
n = 6 # truncation of digits in res
result =True
error = abs(error)
index = []
lin_reg = []
sig1 = signal1.data
sig2 = signal2.data
if is_bool(name_signal1, binary_names):
#The signals are categorized as boolean : we test if they are different
for i, s in enumerate(sig1):
if sig2[i] != s :
result = False
index.append(i)
lin_reg = ["b"," -"," -"," -"] #boolean signals : no linear regression
else:
#The signals are 'reg' (continuous)
for i, s in enumerate(sig1):
if s or sig2[i]: # avoid division by 0
if abs(2*(s-sig2[i])/(abs(s)+abs(sig2[i]))) > error:
result = False
index.append(i)
np.seterr(divide="ignore")
np.seterr(invalid="ignore")
a, b, r_value, p_value, std_err = stats.linregress(sig1, sig2)
np.seterr(divide="warn")
np.seterr(divide="ignore")
lin_reg = ["c", str(a)[0:n], str(b)[0:n], str(r_value**2)] #continuous signals : linear regression parameters
logger.info(result)
if result:
logger.info("Les signaux ", signal1.name, signal2.name, "sont identiques (à l'erreur error près)\n")
else:
logger.info("L'erreur relative entre les signaux est supérieur à error sur une certaine plage\n")
return result, index, lin_reg
#%%
def get_trend(books, trades):
'''
Returns the linear trend in previous trades for each data point in DataFrame
of book data
'''
def trend(x):
trades_n = trades.iloc[x.trades_indexes[0]:x.trades_indexes[1]]
if len(trades_n) < 3:
return 0
else:
return linregress(trades_n.index.values, trades_n.price.values)[0]
return books.apply(trend, axis=1)
# def get_tick_df(min_ts, max_ts, live, convert_timestamps=False):
# '''
# Returns a DataFrame of ticks in time range
# '''
# if not live:
# query = {'_id': {'$gt': min_ts, '$lt': max_ts}}
# cursor = ticks_db.find(query).sort('_id', pymongo.ASCENDING)
# else:
# cursor = ticks_db.find({}).sort('$natural', pymongo.DESCENDING).limit(1)
#
# ticks = pd.DataFrame(list(cursor))
#
# if not ticks.empty:
# ticks = ticks.set_index('_id')
# if convert_timestamps:
# ticks.index = pd.to_datetime(ticks.index, unit='s')
# return ticks
#
# def get_ticks_indexes(books, ticks):
# '''
# Returns indexes of ticks closest to each data point in DataFrame
# of book data
# '''
# def ticks_indexes(ts):
# ts = int(ts)
# return ticks.index.get_loc(ts, method='nearest')
# return books.index.map(ticks_indexes)
#
# def get_buys_from_ticks(books, ticks):
# '''
# Returns a count of trades for each data point in DataFrame of book data
# '''
# def get_buy(x):
# return ticks.iloc[x.ticks_indexes].buy
# return books.apply(get_buy, axis=1)
#
# def get_sells_from_ticks(books, ticks):
# '''
# Returns a count of trades for each data point in DataFrame of book data
# '''
# def get_sell(x):
# return ticks.iloc[x.ticks_indexes].sell
# return books.apply(get_sell, axis=1)
def validate(self, plot=False, cutoff=0.0, validation_set = None, fname=None):
'''
predict titers of the validation set (separate set of test_titers aside previously)
and compare against known values. If requested by plot=True,
a figure comparing predicted and measured titers is produced
'''
from scipy.stats import linregress, pearsonr
if validation_set is None:
validation_set=self.test_titers
self.validation = {}
for key, val in validation_set.iteritems():
pred_titer = self.predict_titer(key[0], key[1], cutoff=cutoff)
self.validation[key] = (val, pred_titer)
a = np.array(self.validation.values())
print ("number of prediction-measurement pairs",a.shape)
self.abs_error = np.mean(np.abs(a[:,0]-a[:,1]))
self.rms_error = np.sqrt(np.mean((a[:,0]-a[:,1])**2))
self.slope, self.intercept, tmpa, tmpb, tmpc = linregress(a[:,0], a[:,1])
print ("error (abs/rms): ",self.abs_error, self.rms_error)
print ("slope, intercept:", self.slope, self.intercept)
self.r2 = pearsonr(a[:,0], a[:,1])[0]**2
print ("pearson correlation:", self.r2)
if plot:
import matplotlib.pyplot as plt
import seaborn as sns
fs=16
sns.set_style('darkgrid')
plt.figure()
ax = plt.subplot(111)
plt.plot([-1,6], [-1,6], 'k')
plt.scatter(a[:,0], a[:,1])
plt.ylabel(r"predicted $\log_2$ distance", fontsize = fs)
plt.xlabel(r"measured $\log_2$ distance" , fontsize = fs)
ax.tick_params(axis='both', labelsize=fs)
plt.text(-2.5,6,'regularization:\nprediction error:\nR^2:', fontsize = fs-2)
plt.text(1.2,6, str(self.lam_drop)+'/'+str(self.lam_pot)+'/'+str(self.lam_avi)+' (HI/pot/avi)'
+'\n'+str(round(self.abs_error, 2))+'/'+str(round(self.rms_error, 2))+' (abs/rms)'
+ '\n' + str(self.r2), fontsize = fs-2)
plt.tight_layout()
if fname:
plt.savefig(fname)
def calc_time_censored_tree_frequencies(self):
print("fitting time censored tree frequencies")
# this doesn't interfere with the previous freq estimates via difference in region: global_censored vs global
region = "global_censored"
freq_cutoff = 25.0
pivots_fit = 6
freq_window = 1.0
for node in self.nodes:
node.fit_frequencies = {}
node.freq_slope = {}
for time in self.timepoints:
time_interval = [time - freq_window, time]
pivots = make_pivots(
time_interval[0],
time_interval[1],
1 / self.pivot_spacing
)
node_filter_func = lambda node: node.numdate >= time_interval[0] and node.numdate < time_interval[1]
# Recalculate tree frequencies for the given time interval and its
# corresponding pivots.
tree_freqs = tree_frequencies(self.tree, pivots, node_filter=node_filter_func)
tree_freqs.estimate_clade_frequencies()
self.frequencies[time] = tree_freqs.frequencies
# Annotate censored frequencies on nodes.
# TODO: replace node-based annotation with dicts indexed by node name.
for node in self.nodes:
node.freq = {
region: self.frequencies[time][node.clade]
}
node.logit_freq = {
region: logit_transform(self.frequencies[time][node.clade], 1e-4)
}
for node in self.nodes:
if node.logit_freq[region] is not None:
node.fit_frequencies[time] = np.minimum(freq_cutoff, np.maximum(-freq_cutoff,node.logit_freq[region]))
else:
node.fit_frequencies[time] = self.node_parents[node].fit_frequencies[time]
try:
slope, intercept, rval, pval, stderr = linregress(pivots[pivots_fit:-1], node.fit_frequencies[time][pivots_fit:-1])
node.freq_slope[time] = slope
except:
import ipdb; ipdb.set_trace()
# reset pivots in tree to global pivots
self.rootnode.pivots = self.pivots