def _fit(self, dataset):
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
tRatio = self.getOrDefault(self.trainRatio)
seed = self.getOrDefault(self.seed)
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
metrics = [0.0] * numModels
condition = (df[randCol] >= tRatio)
validation = df.filter(condition)
train = df.filter(~condition)
for j in range(numModels):
model = est.fit(train, epm[j])
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
python类argmin()的实例源码
def contour_to_monitor_coords(screenCnt):
'''Apply pyimagesearch algorithm to identify tl,tr,br,bl points from a contour'''
# now that we have our screen contour, we need to determine
# the top-left, top-right, bottom-right, and bottom-left
# points so that we can later warp the image -- we'll start
# by reshaping our contour to be our finals and initializing
# our output rectangle in top-left, top-right, bottom-right,
# and bottom-left order
pts = screenCnt.reshape(4, 2)
rect = np.zeros((4, 2), dtype = "float32")
# the top-left point has the smallest sum whereas the
# bottom-right has the largest sum
s = pts.sum(axis = 1)
rect[0] = pts[np.argmin(s)]
rect[2] = pts[np.argmax(s)]
# compute the difference between the points -- the top-right
# will have the minumum difference and the bottom-left will
# have the maximum difference
diff = np.diff(pts, axis = 1)
rect[1] = pts[np.argmin(diff)]
rect[3] = pts[np.argmax(diff)]
return rect
def _get_relative_note_occurences(note_models, pitch_distribution):
max_rel_occur = 0
for note_symbol, note in iteritems(note_models):
try:
# get the relative occurrence of each note from the pitch
# distribution
dists = np.array([abs(note['stable_pitch']['value'] - dist_bin)
for dist_bin in pitch_distribution.bins])
except TypeError:
logging.info(u'The stable pitch for {0:s} is not computed'
.format(note_symbol))
# use the max peak even if it's weak, far from theoretical etc.
peak_idx, heights = note['distribution'].detect_peaks()
max_peak_ind = peak_idx[np.argmax(heights)]
max_bin = note['distribution'].bins[max_peak_ind]
dists = np.array([abs(max_bin - dist_bin)
for dist_bin in pitch_distribution.bins])
peak_ind = np.argmin(dists)
note['rel_occur'] = pitch_distribution.vals[peak_ind]
max_rel_occur = max([max_rel_occur, note['rel_occur']])
return max_rel_occur
def _evalAndDer(self,x):
'''
Returns the level and first derivative of the function at each value in
x. Only called internally by HARKinterpolator1D.eval_and_der.
'''
m = len(x)
fx = np.zeros((m,self.funcCount))
for j in range(self.funcCount):
fx[:,j] = self.functions[j](x)
fx[np.isnan(fx)] = np.inf
i = np.argmin(fx,axis=1)
y = fx[np.arange(m),i]
dydx = np.zeros_like(y)
for j in range(self.funcCount):
c = i == j
dydx[c] = self.functions[j].derivative(x[c])
return y,dydx
def _derX(self,x,y):
'''
Returns the first derivative of the function with respect to X at each
value in (x,y). Only called internally by HARKinterpolator2D._derX.
'''
m = len(x)
temp = np.zeros((m,self.funcCount))
for j in range(self.funcCount):
temp[:,j] = self.functions[j](x,y)
temp[np.isnan(temp)] = np.inf
i = np.argmin(temp,axis=1)
dfdx = np.zeros_like(x)
for j in range(self.funcCount):
c = i == j
dfdx[c] = self.functions[j].derivativeX(x[c],y[c])
return dfdx
def _derY(self,x,y):
'''
Returns the first derivative of the function with respect to Y at each
value in (x,y). Only called internally by HARKinterpolator2D._derY.
'''
m = len(x)
temp = np.zeros((m,self.funcCount))
for j in range(self.funcCount):
temp[:,j] = self.functions[j](x,y)
temp[np.isnan(temp)] = np.inf
i = np.argmin(temp,axis=1)
y = temp[np.arange(m),i]
dfdy = np.zeros_like(x)
for j in range(self.funcCount):
c = i == j
dfdy[c] = self.functions[j].derivativeY(x[c],y[c])
return dfdy
def _derX(self,x,y,z):
'''
Returns the first derivative of the function with respect to X at each
value in (x,y,z). Only called internally by HARKinterpolator3D._derX.
'''
m = len(x)
temp = np.zeros((m,self.funcCount))
for j in range(self.funcCount):
temp[:,j] = self.functions[j](x,y,z)
temp[np.isnan(temp)] = np.inf
i = np.argmin(temp,axis=1)
dfdx = np.zeros_like(x)
for j in range(self.funcCount):
c = i == j
dfdx[c] = self.functions[j].derivativeX(x[c],y[c],z[c])
return dfdx
def _derY(self,x,y,z):
'''
Returns the first derivative of the function with respect to Y at each
value in (x,y,z). Only called internally by HARKinterpolator3D._derY.
'''
m = len(x)
temp = np.zeros((m,self.funcCount))
for j in range(self.funcCount):
temp[:,j] = self.functions[j](x,y,z)
temp[np.isnan(temp)] = np.inf
i = np.argmin(temp,axis=1)
y = temp[np.arange(m),i]
dfdy = np.zeros_like(x)
for j in range(self.funcCount):
c = i == j
dfdy[c] = self.functions[j].derivativeY(x[c],y[c],z[c])
return dfdy
def test_summary_max_shortest_2(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.6827
c.configure(statistics="max_shortest", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
x2 = interp1d(cdf, xs, bounds_error=False, fill_value=np.inf)(cdf + summary_area)
dist = x2 - xs
ind = np.argmin(dist)
x0 = xs[ind]
x2 = x2[ind]
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(x0, summary[0], atol=0.05)
assert np.isclose(x2, summary[2], atol=0.05)
def test_summary_max_shortest_3(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.95
c.configure(statistics="max_shortest", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(-1, 5, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
cdf = skewnorm.cdf(xs, 5, 1, 1.5)
x2 = interp1d(cdf, xs, bounds_error=False, fill_value=np.inf)(cdf + summary_area)
dist = x2 - xs
ind = np.argmin(dist)
x0 = xs[ind]
x2 = x2[ind]
xmax = xs[pdf.argmax()]
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(x0, summary[0], atol=0.05)
assert np.isclose(x2, summary[2], atol=0.05)
def find_closest(t, v, t0, v0):
""" Find the closest point on the curve f = a + b/x
to the given point (t,v)
"""
a = v0
b = v0*t0
# Solve for intersection points
eqn_coefs = [1/b, -t/b, 0, v-a, -b]
tis = np.roots(eqn_coefs)
tis = tis[abs(tis.imag/tis.real)<0.01].real # We care only real solutions
tis = tis[tis>0] # and positive ones
# Choose the shortest among solutions
ds = abs(tis-t)*np.sqrt(1 + np.power(tis,4)/(b*b)) # Distance from solutions to given point (t,v)
idx = np.argmin(ds)
ti = tis[idx]
vi = a + b/ti
return ti, vi
def find_null_offset(xpts, powers, default=0.0):
"""Finds the offset corresponding to the minimum power using a fit to the measured data"""
def model(x, a, b, c):
return a*(x - b)**2 + c
powers = np.power(10, powers/10.)
min_idx = np.argmin(powers)
try:
fit = curve_fit(model, xpts, powers, p0=[1, xpts[min_idx], powers[min_idx]])
except RuntimeError:
logger.warning("Mixer null offset fit failed.")
return default, np.zeros(len(powers))
best_offset = np.real(fit[0][1])
best_offset = np.minimum(best_offset, xpts[-1])
best_offset = np.maximum(best_offset, xpts[0])
xpts_fine = np.linspace(xpts[0],xpts[-1],101)
fit_pts = np.array([np.real(model(x, *fit[0])) for x in xpts_fine])
if min(fit_pts)<0: fit_pts-=min(fit_pts)-1e-10 #prevent log of a negative number
return best_offset, xpts_fine, 10*np.log10(fit_pts)
def test_reduce_argmin():
def argmin(ndarray, axis, keepdims=False):
res = np.argmin(ndarray, axis=axis)
if keepdims:
res = np.expand_dims(res, axis=axis)
return res
data = np.array([[[5, 1], [20, 2]], [[30, 1], [40, 2]], [[55, 1], [60, 2]]], dtype=np.float32)
assert np.array_equal(import_and_compute('ArgMin', data, axis=0),
argmin(data, keepdims=True, axis=0))
assert np.array_equal(import_and_compute('ArgMin', data, axis=0, keepdims=0),
argmin(data, keepdims=False, axis=0))
assert np.array_equal(import_and_compute('ArgMin', data, axis=1),
argmin(data, keepdims=True, axis=1))
assert np.array_equal(import_and_compute('ArgMin', data, axis=1, keepdims=0),
argmin(data, keepdims=False, axis=1))
assert np.array_equal(import_and_compute('ArgMin', data, axis=2),
argmin(data, keepdims=True, axis=2))
assert np.array_equal(import_and_compute('ArgMin', data, axis=2, keepdims=0),
argmin(data, keepdims=False, axis=2))
def logistic_var():
"""
Finds a variance to match probit and logistic regression.
Finds a variance :math:`\\tau_w` such that,
:math:`p=P(W < z) \\approx \\frac{1}{1+e^{-z}},`
where :math:`W \\sim {\\mathcal N}(0,\\tau_w)`.
"""
z = np.linspace(-5,5,1000) # z points to test
p1 = 1/(1+np.exp(-z)) # target probability
var_test = np.linspace(2,3,1000)
err = []
for v in var_test:
p2 = 0.5*(1+scipy.special.erf(z/np.sqrt(v*2)))
err.append(np.mean((p1-p2)**2))
i = np.argmin(err)
wvar = var_test[i]
return wvar
def NLS_annealing(F, xi, yi, p, N=100, n=10, sigma=5.,factor=0.5):
# N = size of population in one iteration
# n = number of iterations
# sigma = initial (multiplicative) standard deviation
# factor = factor to reduce sigma per iteration
print "initial", p
p = np.atleast_1d(p)
dim = len(p)
# make initial sigma act like multiplication by sigma^(+-1)
sigma = np.log(sigma)*np.ones(dim)
for k in range(n):
# create new population by adding multiplicative gaussian noise
P = p[None, :] * np.exp(np.random.randn(N, dim) * sigma[None, :])
# compute mean square loss on population
f = np.mean((F(xi[None, :], P) - yi)**2, 1)
# replace p by new best guess
p = P[np.argmin(f), :]
# update sigma
sigma *= factor
print "parameters:", p
print "minimum", min(f)
return tuple(p)
def furthest_point_sample(vertices, faces, N, K):
num_vertices = vertices.shape[0]
center_indices = np.random.choice(num_vertices, N, replace=False)
sqr_dists = 1e10 * np.ones(num_vertices)
vertex_as = np.zeros(num_vertices, dtype=np.int32)
for i in range(N):
new_sqr_dists = np.sum(np.square(vertices - vertices[center_indices[i]]), 1)
update_mask = new_sqr_dists < sqr_dists
sqr_dists[update_mask] = new_sqr_dists[update_mask]
vertex_as[update_mask] = i
next_center = np.argmax(sqr_dists)
if K - 1 <= i < N - 1:
center_indices[i + 1] = next_center
centers = vertices[center_indices]
face_centers = np.mean(vertices[faces], 1)
sqr_dists = sqr_dist(centers, face_centers)
face_as = np.argmin(sqr_dists, 1)
return center_indices, vertex_as, face_as
def createLists(dbFilename):
print('Splitting RIRs into sets...')
sets = [
RirSet('train', 0.8),
RirSet('test', 0.1),
RirSet('dev', 0.1),
]
# open database
rirDb = json.load(open(dbFilename))
rirs = sorted(list(rirDb.keys()))
# to distribute the RIRs to the set we could to a shuffle, but as they are in alphabetical order and just going over them guaranties that we distribute the different conditions (mostly) equally on the different sets
sets[0].add(rirs[0])
for i in range(1, len(rirs)):
si = np.argmin([s.missing(i) for s in sets])
sets[si].add(rirs[i])
# safe set files
util.createDirectory(ListDir)
for s in sets:
s.save(ListDir)
def lat_lng_to_usaf_station(lat, lng):
"""Return the closest USAF station ID using latitude and
longitude coordinates.
Parameters
----------
lat : float
Latitude coordinate.
lng : float
Longitude coordinate.
Returns
-------
station : str, None
String representing a USAF weather station ID or None, if none was
found.
"""
if lat is None or lng is None:
return None
usaf_station_to_lat_lng_index = _load_usaf_station_to_lat_lng_index()
index_list = list(usaf_station_to_lat_lng_index.items())
dists = [haversine(lat, lng, stat_lat, stat_lng)
for _, (stat_lat, stat_lng) in index_list]
return index_list[np.argmin(dists)][0]
def lat_lng_to_tmy3_station(lat, lng):
"""Return the closest TMY3 station ID using latitude and
longitude coordinates.
Parameters
----------
lat : float
Latitude coordinate.
lng : float
Longitude coordinate.
Returns
-------
station : str, None
String representing a TMY3 weather station ID or None, if none was
found.
"""
if lat is None or lng is None:
return None
tmy3_station_to_lat_lng_index = _load_tmy3_station_to_lat_lng_index()
index_list = list(tmy3_station_to_lat_lng_index.items())
dists = [haversine(lat, lng, stat_lat, stat_lng)
for _, (stat_lat, stat_lng) in index_list]
return index_list[np.argmin(dists)][0]
def lat_lng_to_zipcode(lat, lng):
"""Return the closest ZIP code using latitude and
longitude coordinates.
Parameters
----------
lat : float
Latitude coordinate.
lng : float
Longitude coordinate.
Returns
-------
zipcode : str, None
String representing a USPS ZIP code, or None, if none was found.
"""
if lat is None or lng is None:
return None
zipcode_to_lat_lng_index = _load_zipcode_to_lat_lng_index()
index_list = list(zipcode_to_lat_lng_index.items())
dists = [haversine(lat, lng, zip_lat, zip_lng)
for _, (zip_lat, zip_lng) in index_list]
return index_list[np.argmin(dists)][0]