def FORCAST(self, param):
class Context:
def __init__(self, N):
self.N = N
self.q = deque([], self.N)
self.x = [i for i in range(self.N)]
def handleInput(self, value):
if len(self.q) < self.N:
self.q.append(value)
return np.NaN
z1 = np.polyfit(self.x, self.q, 1)
fn = np.poly1d(z1)
y = fn(self.N + 1)
self.q.append(value)
return y
ctx = Context(param[1])
result = param[0].apply(ctx.handleInput)
return result
#????
python类poly1d()的实例源码
def __init__(self, calibration_file, supply_voltage_method, readout_voltage_method):
super(HallProbe, self).__init__()
self.name = "Lakeshore Hall Probe"
with open(calibration_file) as cf:
lines = [l for l in cf.readlines() if l[0] != '#']
if len(lines) != 2:
raise Exception("Invalid Hall probe calibration file, must contain two lines.")
try:
self.output_voltage = float(lines[0])
except:
raise TypeError("Could not convert output voltage to floating point value.")
try:
poly_coeffs = np.array(lines[1].split(), dtype=np.float)
self.field_vs_voltage = np.poly1d(poly_coeffs)
except:
raise TypeError("Could not convert calibration coefficients into list of floats")
self.getter = readout_voltage_method
self.setter = supply_voltage_method
self.setter(self.output_voltage)
def __init__(self, calibration_file, field_getter, current_setter, current_getter, field_averages=5):
super(Electromagnet, self).__init__()
self.name = "Composite Magnet Instrument"
with open(calibration_file) as cf:
lines = [l for l in cf.readlines() if l[0] != '#']
if len(lines) != 1:
raise Exception("Invalid magnet control calibration file, must contain one line.")
try:
# Construct the fit
poly_coeffs = np.array(lines[0].split(), dtype=np.float)
self.current_vs_field = np.poly1d(poly_coeffs)
except:
raise TypeError("Could not convert calibration coefficients into list of floats")
self.field_getter = field_getter
self.current_setter = current_setter
self.current_getter = current_getter
self.field_averages = field_averages
self.calibrated_slope = poly_coeffs[0]
def fitted_iv_curve(self,TES):
'''
make a curve from the fit parameters
'''
filterinfo=self.filterinfo(TES)
if filterinfo==None:return None
offset=self.offset(TES)
fit=filterinfo['fit']
self.TES=TES # this is required for the "mixed" and "combined" models
istart,iend=self.selected_iv_curve(TES)
bias=self.bias_factor*self.vbias[istart:iend]
# polynomial fit
if 'fitfunction' not in fit.keys() or fit['fitfunction']=='POLYNOMIAL':
func=np.poly1d(fit['fitinfo'][0]) + offset
f=func(bias)
return bias,f
# combined polynomial fit
Vsuper,Vnormal,a0,a1,b0,b1,b2,b3,c0,c1=fit['fitinfo'][0]
f=self.model_iv_combined(bias,Vsuper,Vnormal,a0,a1,b0,b1,b2,b3,c0,c1) + offset
return bias,f
def test_str_leading_zeros(self):
p = np.poly1d([4, 3, 2, 1])
p[3] = 0
assert_equal(str(p),
" 2\n"
"3 x + 2 x + 1")
p = np.poly1d([1, 2])
p[0] = 0
p[1] = 0
assert_equal(str(p), " \n0")
def trendLine(self, axis_choose=None):
stable_sec= int(self.record_sec_le.text())
stable_count = int(stable_sec * (1/0.007))
if axis_choose:
axis = axis_choose
else:
axis = str(self.axis_combobox.currentText())
x = self.raw_data['time'][:stable_count]
y = self.raw_data[axis][:stable_count]
coefficients = np.polyfit(x,y,1)
p = np.poly1d(coefficients)
coefficient_of_dermination = r2_score(y, p(x))
self.trendLine_content1_label.setText("Trendline: " + str(p))
self.trendLine_content2_label.setText("R: " + str(coefficient_of_dermination))
return coefficients
def compute_xvvr(self):
""" Return xvv(r) matrix """
r = np.array([i*self.dr for i in range(self.ngrid)])
k = self.get_k()
xvvr = [["" for i in range(self.nsites)] for j in range(self.nsites)]
for i in range(self.nsites):
for j in range(self.nsites):
xvvk_ij = self.xvv_data[:,i,j]
xvvr_ij = pubfft.sinfti(xvvk_ij*k, self.dr, -1)/r
# n_pots_for_interp = 6
# r_for_interp = r[1:n_pots_for_interp+1]
# xvvr_for_interp = xvvr_ij[:n_pots_for_interp]
# poly_coefs = np.polyfit(r_for_interp, xvvr_for_interp, 3)
# poly_f = np.poly1d(poly_coefs)
# xvvr[i][j] = [poly_f(0)]
xvvr[i][j] = xvvr_ij
return r, np.swapaxes(xvvr, 0, 2)
def compute_zr(self):
""" Return z(r) matrix """
r = np.array([i*self.dr for i in range(self.ngrid)])
k, zk = self.compute_zk()
print 'computed zk',zk.shape
zr = [["" for i in range(self.nsites)] for j in range(self.nsites)]
for i in range(self.nsites):
for j in range(self.nsites):
zk_ij = zk[1:,i,j]
zr_ij = pubfft.sinfti(zk_ij*k[1:], self.dr, -1)/r[1:]
#zr_ij = np.abs(fftpack.fft(zk_ij))
n_pots_for_interp = 6
r_for_interp = r[1:n_pots_for_interp+1]
zr_for_interp = zr_ij[:n_pots_for_interp]
poly_coefs = np.polyfit(r_for_interp, zr_for_interp, 3)
poly_f = np.poly1d(poly_coefs)
zr[i][j] = [poly_f(0)]
zr[i][j].extend(zr_ij)
return r, np.swapaxes(zr, 0, 2)
def smooth(x, y, weights):
'''
in case the NLF cannot be described by
a square root function
commit bounded polynomial interpolation
'''
# Spline hard to smooth properly, therefore solfed with
# bounded polynomal interpolation
# ext=3: no extrapolation, but boundary value
# return UnivariateSpline(x, y, w=weights,
# s=len(y)*weights.max()*100, ext=3)
# return np.poly1d(np.polyfit(x,y,w=weights,deg=2))
p = np.polyfit(x, y, w=weights, deg=2)
if np.any(np.isnan(p)):
# couldn't even do polynomial fit
# as last option: assume constant noise
my = np.average(y, weights=weights)
return lambda x: my
return lambda xint: np.poly1d(p)(np.clip(xint, x[0], x[-1]))
def nonlin_poly(self, u):
"""nonlin_poly
ip2d.motortransfer_func legacy
"""
# olimm1 = 0.5
olim = 2
# z = array([ 0.27924011, 0.12622341, 0.0330395, -0.00490162])
# z = array([ 0.00804775, 0.00223221, -0.1456263, -0.04297434, 0.74612441, 0.26178644, -0.01953301, -0.00243736])
# FIXME: somewhere there's a spearate script for generating the coeffs
z = array([9.46569349e-04, 4.84698808e-03, -1.64436822e-02, -8.76479549e-02, 7.67630339e-02, 4.48107332e-01, -4.53365904e-03, -2.69288039e-04, 1.18423789e-15])
p3 = poly1d(z)
# print "pre", self.ip2d.u[ti]
# self.ip2d.u[ti] = p3(tanh(self.ip2d.u[ti]) * self.olim)
y = p3(tanh(u) * olim)
return y
def callback_click_ok(self,data):
points = np.array(self.interal_data) #[(1, 1), (2, 2), (3, 3), (7, 3), (9, 3)]
# get x and y vectors
x = points[:,0]
y = points[:,1]
# calculate polynomial
terms=int(self.sp.value())
self.ret = np.polyfit(x, y, terms)
f = np.poly1d(self.ret)
tot=""
val=0
for i in range(0,len(self.ret)):
p=len(self.ret)-1-i
tot=tot+str(self.ret[i])+"*pow(w,"+str(p)+")"+"+"
tot=tot[:-1]
self.ret_math=tot
self.close()
def model(self, p, x):
"""
A parameteric model to fit y = f(x, p)
This can be overridden in a class that inherits from this one to make a new model
Parameters:
===========
- p: Any iterable
The model parameters
- x: numpy.ndarray
The dependent variable
Returns:
========
A model at each of the x locations
"""
return np.poly1d(p)(x)
def build_model_poly(detection_pairs, beacon_sdoa, nominal_sample_rate, deg=2):
if len(detection_pairs) < deg + 1:
# not enough beacon transmissions
return None
soa0 = np.array([d[0].soa for d in detection_pairs])
soa1 = np.array([d[1].soa for d in detection_pairs])
soa1at0 = soa1 + np.array(beacon_sdoa)
coef = np.polyfit(soa1at0, soa0, deg)
fit = np.poly1d(coef)
# residuals = soa0 - fit(soa1at0)
# print(np.mean(residuals))
def evaluate(det0, det1):
return (det0.soa - fit(det1.soa)) / nominal_sample_rate
return evaluate
def redo_fit(self):
lx0, lx1 = self.power_plot_lr.getRegion()
x0, x1 = 10**lx0, 10**lx1
X = self.dat['power_meter_power']
n = len(X)
ii0 = np.argmin(np.abs(X[:n//2+1]-x0))
ii1 = np.argmin(np.abs(X[:n//2+1]-x1))
print(ii0,ii1)
m, b = np.polyfit(np.log10(X[ii0:ii1]), np.log10(self.power_plot_y[ii0:ii1]), deg=1)
print("fit", m,b)
fit_data = 10**(np.poly1d((m,b))(np.log10(X)))
print("fit_data", fit_data)
self.power_fit_plotcurve.setData(X, fit_data)
self.fit_text.setHtml("<h1>I<sup>{:1.2f}</sup></h1>".format(m))
self.fit_text.setPos(0.5*(lx0+lx1), np.log10(fit_data[(ii0+ii1)//2]))
def bez2poly(bez, numpy_ordering=True, return_poly1d=False):
"""Converts a Bezier object or tuple of Bezier control points to a tuple
of coefficients of the expanded polynomial.
return_poly1d : returns a numpy.poly1d object. This makes computations
of derivatives/anti-derivatives and many other operations quite quick.
numpy_ordering : By default (to accommodate numpy) the coefficients will
be output in reverse standard order.
Note: This function is redundant thanks to the .poly() method included
with all bezier segment classes."""
if is_bezier_segment(bez):
bez = bez.bpoints()
return bezier2polynomial(bez,
numpy_ordering=numpy_ordering,
return_poly1d=return_poly1d)
# Geometric####################################################################
def polynomial2bezier(poly):
"""Converts a cubic or lower order Polynomial object (or a sequence of
coefficients) to a CubicBezier, QuadraticBezier, or Line object as
appropriate."""
if isinstance(poly, poly1d):
c = poly.coeffs
else:
c = poly
order = len(c)-1
if order == 3:
bpoints = (c[3], c[2]/3 + c[3], (c[1] + 2*c[2])/3 + c[3],
c[0] + c[1] + c[2] + c[3])
elif order == 2:
bpoints = (c[2], c[1]/2 + c[2], c[0] + c[1] + c[2])
elif order == 1:
bpoints = (c[1], c[0] + c[1])
else:
raise AssertionError("This function is only implemented for linear, "
"quadratic, and cubic polynomials.")
return bpoints
# Curve Splitting #############################################################
def _calculate_HS_GL_polynomial(HS_break, id_axis, a_cohort_before_start_MS_elongation_1, TT_hs_0, TT_hs_break, TT_flag_ligulation, n0, n1, n2, t0, t1, a, c, a_cohort_before_start_MS_elongation_2):
# define HS(TT)
HS_1 = np.poly1d([a_cohort_before_start_MS_elongation_1, - a_cohort_before_start_MS_elongation_1 * TT_hs_0]) # index_phytomer < HS_break
if HS_break is None: # linear
HS_2 = None # index_phytomer >= HS_break
else: # bilinear
HS_2 = np.poly1d([a_cohort_before_start_MS_elongation_2, - a_cohort_before_start_MS_elongation_2 * TT_hs_break + HS_break]) # index_phytomer >= HS_break
# define GL(TT) for all phases except TT < t0 (because it depends on index_phytomer)
if id_axis == 'MS':
GL_2 = np.poly1d([(n1 - n0) / (t1 - t0), n0 - t0 * (n1 - n0) / (t1 - t0)])
GL_3 = np.poly1d([(n2 - n1) / (TT_flag_ligulation - t1), n1 - t1 * (n2 - n1) / (TT_flag_ligulation - t1)])
else: # tillers
if np.isnan(t0): # only 3 phases
GL_2 = np.poly1d([n1 / (t1 - TT_hs_0), n1 * TT_hs_0 / (TT_hs_0 - t1)])
else:
GL_2 = np.poly1d([(n1 - n0) / (t1 - t0), n1 - t1 * (n1 - n0) / (t1 - t0)])
GL_3 = np.poly1d([(n2 - n1) / (TT_flag_ligulation - t1), n2 - TT_flag_ligulation * (n2 - n1) / (TT_flag_ligulation - t1)])
GL_4 = np.poly1d([a, - 3 * a * TT_flag_ligulation, 3 * a * TT_flag_ligulation**2 + c, - a * TT_flag_ligulation**3 - c * TT_flag_ligulation + n2])
return HS_1, HS_2, GL_2, GL_3, GL_4
def fast_search(prob, dtype=np.float32):
size = len(prob)
fk = np.zeros((size + 1), dtype=dtype)
C = np.zeros((size + 1, size + 1), dtype=dtype)
S = np.empty((2 * size + 1), dtype=dtype)
S[:] = np.nan
for k in range(1, 2 * size + 1):
S[k] = 1./k
roots = (prob - 1.0) / prob
for k in range(size, 0, -1):
poly = np.poly1d(roots[0:k], True)
factor = np.multiply.reduce(prob[0:k])
C[k, 0:k+1] = poly.coeffs[::-1]*factor
for k1 in range(size + 1):
fk[k] += (1. + 1.) * k1 * C[k, k1]*S[k + k1]
for i in range(1, 2*(k-1)):
S[i] = (1. - prob[k-1])*S[i] + prob[k-1]*S[i+1]
return fk
def _interp2_r0(Data, Pow2factor, kind, disp=False):
if disp:
p30 = np.poly1d(np.polyfit([10, 30, 50, 60, 70, 80],
np.log([0.8, 1.3, 6, 12, 28, 56]), 2))
print('Expectd time for NxN data:', np.exp(p30(Data.shape[0])))
x = np.arange(Data.shape[1])
y = np.arange(Data.shape[0])
xv, yv = np.meshgrid(x, y)
f = interpolate.interp2d(xv, yv, Data, kind=kind)
xnew = np.arange(0, Data.shape[1], 1 / (2**Pow2factor))
ynew = np.arange(0, Data.shape[0], 1 / (2**Pow2factor))
Upsampled = f(xnew, ynew)
return Upsampled
def approx(x1, y1):
# calculate polynomial
z = np.polyfit(x1, y1, 1)
print(z)
f = np.poly1d(z)
# calculate new x's and y's
x_new = np.linspace(x1[0], x1[-1], len(x1))
y_new = f(x_new)
with open('a.txt', 'a') as f:
for i in range(len(x_new)):
f.write(json.dumps(
{'time': x_new[i],
'acceleration': y_new[i]
})+'\n')
return x_new, y_new
def poly_fit(x, y, degree):
results = {}
co_effs = np.polyfit(x, y, degree)
results['poly_normal'] = co_effs
p = np.poly1d(co_effs)
y_hat = p(x)
y_bar = np.sum(y) / len(y)
ss_reg = np.sum((y_hat - y_bar) ** 2)
ss_tot = np.sum((y - y_bar) ** 2)
results['determination'] = ss_reg / ss_tot
return results
# noinspection PyTypeChecker
def regenerateCalibration(self):
B=self.R[1]
A=self.R[0]
intercept = self.R[0]
if self.gain!=None:
gain = self.gain_values[self.gain]
B = B/gain
A = A/gain
slope = B-A
intercept = A
if self.calibrationReady and self.gain!=8 : #special case for 1/11. gain
self.calPoly10 = self.__cal10__
self.calPoly12 = self.__cal12__
else:
self.calPoly10 = np.poly1d([0,slope/1023.,intercept])
self.calPoly12 = np.poly1d([0,slope/4095.,intercept])
self.voltToCode10 = np.poly1d([0,1023./slope,-1023*intercept/slope])
self.voltToCode12 = np.poly1d([0,4095./slope,-4095*intercept/slope])
def regenerateCalibration(self):
B=self.R[1]
A=self.R[0]
intercept = self.R[0]
if self.gain!=None:
gain = self.gain_values[self.gain]
B = B/gain
A = A/gain
slope = B-A
intercept = A
if self.calibrationReady and self.gain!=8 : #special case for 1/11. gain
self.calPoly10 = self.__cal10__
self.calPoly12 = self.__cal12__
else:
self.calPoly10 = np.poly1d([0,slope/1023.,intercept])
self.calPoly12 = np.poly1d([0,slope/4095.,intercept])
self.voltToCode10 = np.poly1d([0,1023./slope,-1023*intercept/slope])
self.voltToCode12 = np.poly1d([0,4095./slope,-4095*intercept/slope])
def fit(self, X):
_X = self.__aggregate_dataset(X)
self.polynomial = np.polyfit(_X['expenses'].astype(np.long),
_X['distance_traveled'].astype(np.long),
3)
self._polynomial_fn = np.poly1d(self.polynomial)
return self
def bolcor(teff):
"""
The bolometric correction
Input
-----
teff : int
Effective temperature in K
Output
------
bcflow : float
The bolometric correction
"""
lteff = np.log10(teff)
if lteff < 3.7:
p = [-0.190537291496456e+05, 0.155144866764412e+05, -0.421278819301717e+04, 0.381476328422343e+03]
elif (3.7 <= lteff) and (lteff < 3.9):
p = [-0.370510203809015e+05, 0.385672629965804e+05, -0.150651486316025e+05, 0.261724637119416e+04, -0.170623810323864e+03]
else:
p = [-0.118115450538963e+06, 0.137145973583929e+06, -0.636233812100225e+05, 0.147412923562646e+05, -0.170587278406872e+04, 0.788731721804990e+02]
# The arrays are in form of:
# p[0] + p[1]*x + p[2]*x**2 + ...
# But np.poly1d expects it inversed
bcflow = np.poly1d(p[::-1])(lteff)
return bcflow
def test_poly1d(self, level=rlevel):
# Ticket #28
assert_equal(np.poly1d([1]) - np.poly1d([1, 0]),
np.poly1d([-1, 1]))
def test_poly1d_nan_roots(self, level=rlevel):
# Ticket #396
p = np.poly1d([np.nan, np.nan, 1], r=0)
self.assertRaises(np.linalg.LinAlgError, getattr, p, "r")
def test_poly_div(self, level=rlevel):
# Ticket #553
u = np.poly1d([1, 2, 3])
v = np.poly1d([1, 2, 3, 4, 5])
q, r = np.polydiv(u, v)
assert_equal(q*v + r, u)
def test_poly_eq(self, level=rlevel):
# Ticket #554
x = np.poly1d([1, 2, 3])
y = np.poly1d([3, 4])
assert_(x != y)
assert_(x == x)
def test_polyder_return_type(self):
# Ticket #1249
assert_(isinstance(np.polyder(np.poly1d([1]), 0), np.poly1d))
assert_(isinstance(np.polyder([1], 0), np.ndarray))
assert_(isinstance(np.polyder(np.poly1d([1]), 1), np.poly1d))
assert_(isinstance(np.polyder([1], 1), np.ndarray))