def slope_create(F0,F1):
# Calculate slopes (OLS)
slope, intercept, r_value, p_value, std_err = stats.linregress(F0,F1)
# Slope with robust regression
x = sm.add_constant(F0)
y = F1
rlm_results = sm.RLM(y,x, M=sm.robust.norms.HuberT()).fit()
slope_r = rlm_results.params[-1]
intercept_r = rlm_results.params[0]
del x,y,rlm_results
return slope, intercept, r_value, slope_r, intercept_r
python类add_constant()的实例源码
cloud_frequency.py 文件源码
项目:Cloud-variability-time-frequency
作者: florentbrient
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def _residualNet(data,uselog=True,c=None,p=None,x=None,useaggregate=True,numericalControls=[],categoricalControls=[]):
'''
Given the data on a bipartite network of the form source,target,flow
Parameters
----------
data : pandas.DataFrame
Raw data. It has source,target,volume (trade, number of people etc.).
c,p,x : str (optional)
Labels of the columns in data used for source,target,volume.
If not provided it will use the first, second, and third.
numericalControls : list
List of columns to use as numerical controls.
categoricalControls : list
List of columns to use as categorical controls.
uselog : boolean (True)
If True it will use the logarithm of the provided weight.
useaggregate : boolean (True)
If true it will calculate the aggregate of the volume on both sides (c and p) and use as numbercal controls.
Returns
-------
net : pandas.Dataframe
Table with c,p,x,x_res, where x_res is the residual of regressing x on the given control variables.
'''
c = data.columns.values[0] if c is None else c
p = data.columns.values[1] if p is None else p
x = data.columns.values[2] if x is None else x
data_ = data[[c,p,x]+numericalControls+categoricalControls]
if useaggregate:
data_ = merge(data_,data.groupby(c).sum()[[x]].reset_index().rename(columns={x:x+'_'+c}))
data_ = merge(data_,data.groupby(p).sum()[[x]].reset_index().rename(columns={x:x+'_'+p}))
numericalControls+=[x+'_'+c,x+'_'+p]
if uselog:
data_ = data_[data_[x]!=0]
data_[x] = np.log10(data_[x])
if useaggregate:
data_[x+'_'+c] = np.log10(data_[x+'_'+c])
data_[x+'_'+p] = np.log10(data_[x+'_'+p])
_categoricalControls = []
for var in ser(categoricalControls):
vals = list(set(data_[var]))
for v in vals[1:]:
_categoricalControls.append(var+'_'+str(v))
data_[var+'_'+str(v)]=0
data_.loc[data_[var]==v,var+'_'+str(v)]=1
Y = data_[x].values
X = data_[list(set(numericalControls))+list(set(_categoricalControls))].values
X = sm.add_constant(X)
model = sm.OLS(Y,X).fit()
data_[x+'_res'] = Y-model.predict(X)
return data_[[c,p,x,x+'_res']]
def wt_plot(self, pdf):
"""
Create a plot of the linear fit of the wild type variant.
*pdf* is an open PdfPages instance.
Only created for selections that use WLS or OLS scoring and have a wild type specified.
Uses :py:func:`~plots.fit_axes` for the plotting.
"""
logging.info("Creating wild type fit plot", extra={'oname' : self.name})
# get the data and calculate log ratios
if "variants" in self.labels:
wt_label = "variants"
elif "identifiers" in self.labels:
wt_label = "identifiers"
data = self.store.select("/main/{}/counts".format(wt_label), where='index = "{}"'.format(WILD_TYPE_VARIANT)).ix[0]
sums = self.store['/main/{}/counts'.format(wt_label)].sum(axis="index") # sum of complete cases (N')
yvalues = np.log(data + 0.5) - np.log(sums + 0.5)
xvalues = [tp / float(max(self.timepoints)) for tp in self.timepoints]
# fit the line
X = sm.add_constant(xvalues) # fit intercept
if self.scoring_method == "WLS":
W = 1 / (1 / (data + 0.5) + 1 / (sums + 0.5))
fit = sm.WLS(yvalues, X, weights=W).fit()
elif self.scoring_method == "OLS":
fit = sm.OLS(yvalues, X).fit()
else:
raise ValueError('Invalid regression scoring method "{}" [{}]'.format(self.scoring_method, self.name))
intercept, slope = fit.params
slope_se = fit.bse['x1']
# make the plot
fig, ax = plt.subplots()
fig.set_tight_layout(True)
fit_axes(ax, xvalues, yvalues, slope, intercept, xlabels=self.timepoints)
fit_axes_text(ax, cornertext="Slope {:3.2f}\nSE {:.1f}".format(slope, slope_se))
ax.set_title("Wild Type Shape\n{}".format(self.name))
ax.set_ylabel("Log Ratio (Complete Cases)")
pdf.savefig(fig)
plt.close(fig)
def test_linear_model_time_series(data):
mod = TradedFactorModel(data.portfolios, data.factors)
mod.fit()
res = mod.fit()
get_all(res)
all_params = []
all_tstats = []
nobs, nport = data.portfolios.shape
nf = data.factors.shape[1]
e = np.zeros((nobs, (nport * (nf + 1))))
x = np.zeros((nobs, (nport * (nf + 1))))
factors = sm.add_constant(data.factors)
loc = 0
for i in range(data.portfolios.shape[1]):
if isinstance(data.portfolios, pd.DataFrame):
p = data.portfolios.iloc[:, i:(i + 1)]
else:
p = data.portfolios[:, i:(i + 1)]
ols_res = _OLS(p, factors).fit(cov_type='robust', debiased=True)
all_params.extend(list(ols_res.params))
all_tstats.extend(list(ols_res.tstats))
x[:, loc:(loc + nf + 1)] = factors
e[:, loc:(loc + nf + 1)] = ols_res.resids.values[:, None]
loc += nf + 1
cov = res.cov.values[(nf + 1) * i:(nf + 1) * (i + 1), (nf + 1) * i:(nf + 1) * (i + 1)]
ols_cov = ols_res.cov.values
assert_allclose(cov, ols_cov)
assert_allclose(res.params.values.ravel(), np.array(all_params))
assert_allclose(res.tstats.values.ravel(), np.array(all_tstats))
assert_allclose(res.risk_premia, np.asarray(factors).mean(0)[1:])
xpxi_direct = np.eye((nf + 1) * nport + nf)
f = np.asarray(factors)
fpfi = np.linalg.inv(f.T @ f / nobs)
nfp1 = nf + 1
for i in range(nport):
st, en = i * nfp1, (i + 1) * nfp1
xpxi_direct[st:en, st:en] = fpfi
f = np.asarray(factors)[:, 1:]
xe = np.c_[x * e, f - f.mean(0)[None, :]]
xeex_direct = xe.T @ xe / nobs
cov = xpxi_direct @ xeex_direct @ xpxi_direct / (nobs - nfp1)
assert_allclose(cov, res.cov.values)
alphas = np.array(all_params)[0::nfp1][:, None]
alpha_cov = cov[0:(nfp1 * nport):nfp1, 0:(nfp1 * nport):nfp1]
stat_direct = float(alphas.T @ np.linalg.inv(alpha_cov) @ alphas)
assert_allclose(res.j_statistic.stat, stat_direct)
assert_allclose(1.0 - stats.chi2.cdf(stat_direct, nport), res.j_statistic.pval)
def scatterColor(x0, y, w):
"""Creates scatter plot with points colored by variable.
All input arrays must have matching lengths
:param x0: x values to plot
:type x0: list
:param y: y values to plot
:type y: list
:param w: z values to plot
:returns: plot; slope and intercept of the RLM best fit line shown on the plot
.. warning:: all input arrays must have matching lengths and scalar values
.. note:: See documentation at http://statsmodels.sourceforge.net/0.6.0/generated/statsmodels.robust.robust_linear_model.RLM.html
for the RLM line
"""
import matplotlib as mpl
import matplotlib.cm as cm
import statsmodels.api as sm
from scipy.stats import linregress
cmap = plt.cm.get_cmap('RdYlBu')
norm = mpl.colors.Normalize(vmin=w.min(), vmax=w.max())
m = cm.ScalarMappable(norm=norm, cmap=cmap)
m.set_array(w)
sc = plt.scatter(x0, y, label='', color=m.to_rgba(w))
xa = sm.add_constant(x0)
est = sm.RLM(y, xa).fit()
r2 = sm.WLS(y, xa, weights=est.weights).fit().rsquared
slope = est.params[1]
x_prime = np.linspace(np.min(x0), np.max(x0), 100)[:, np.newaxis]
x_prime = sm.add_constant(x_prime)
y_hat = est.predict(x_prime)
const = est.params[0]
y2 = [i * slope + const for i in x0]
lin = linregress(x0, y)
x1 = np.arange(np.min(x0), np.max(x0), 0.1)
y1 = [i * lin[0] + lin[1] for i in x1]
y2 = [i * slope + const for i in x1]
plt.plot(x1, y1, c='g',
label='simple linear regression m = {:.2f} b = {:.0f}, r^2 = {:.2f}'.format(lin[0], lin[1], lin[2] ** 2))
plt.plot(x1, y2, c='r', label='rlm regression m = {:.2f} b = {:.0f}, r2 = {:.2f}'.format(slope, const, r2))
plt.legend()
cbar = plt.colorbar(m)
cbar.set_label('Julian Date')
return slope, const