def interp_plume_dv(plume_model,pts_file,fname_out='plume.xy',zero_center=True):
'''
Takes a plume model file, interpolates it to a new grid, and writes a new file
args--------------------------------------------------------------------------
plume_model: the plume model file path. cartesian coordinate system
pts_file: a file containing the points to interpolate to
fname_out: the name of the output file
'''
f1 = np.loadtxt(plume_model)
x = f1[:,0]
y = 6371.0 - f1[:,1]
v = f1[:,2]
f2 = np.loadtxt(pts_file,skiprows=1)
x_new = f2[:,0]
y_new = f2[:,1]
x_mirror = x*-1.0
x_pts = np.hstack((x,x_mirror))
y_pts = np.hstack((y,y))
v_pts = np.hstack((v,v))
if not zero_center:
x_pts += max(x_new)/2
interpolator = NearestNDInterpolator((x_pts,y_pts),v_pts)
v_new = interpolator(x_new,y_new)
f3 = open(fname_out,'w')
for i in range(0,len(x_new)):
f3.write('{} {} {}'.format(x_new[i],y_new[i],v_new[i])+'\n')
f3.close()
python类NearestNDInterpolator()的实例源码
def _build_vertex_values(self, items, area_func, value_func):
"""
Interpolate vertice with known altitudes to get altitudes for the remaining ones.
"""
vertex_values = np.empty(self.vertices.shape[:1], dtype=np.int32)
vertex_value_mask = np.full(self.vertices.shape[:1], fill_value=False, dtype=np.bool)
for item in items:
i_vertices = np.unique(self.faces[np.array(tuple(chain(*area_func(item).faces)))].flatten())
vertex_values[i_vertices] = value_func(item, i_vertices)
vertex_value_mask[i_vertices] = True
if not np.all(vertex_value_mask):
interpolate = NearestNDInterpolator(self.vertices[vertex_value_mask],
vertex_values[vertex_value_mask])
vertex_values[np.logical_not(vertex_value_mask)] = interpolate(
*np.transpose(self.vertices[np.logical_not(vertex_value_mask)])
)
return vertex_values
def __init__(self, scenario: model.Scenario):
"""Initialize the model."""
super().__init__(scenario)
s = self._scenario
flag_rs = flag_ss = flag_ns = 0
if s.mechanism == 'SS':
flag_ss = 1
elif s.mechanism == 'NS':
flag_ns = 1
elif s.mechanism == 'RS':
flag_rs = 1
event = (s.mag, s.depth_hyp, flag_rs, flag_ss, flag_ns, s.dist_jb,
s.v_s30)
global INTERPOLATOR
if INTERPOLATOR is None:
with np.load(fname_data) as data:
INTERPOLATOR = NearestNDInterpolator(data['events'],
data['predictions'])
prediction = INTERPOLATOR(event)
self._ln_resp = prediction[0::2]
self._ln_std = np.sqrt(prediction[1::2])
def surface_transform(vals, subj, hemi, direction="reverse"):
"""Transform a surface scalar map using spherical transform.
Parameters
----------
vals : array or Series
Scalar value map to transform.
subj : string
Freesurfer subject ID.
hemi : lh | rh
Hemisphere data are defined on
direction : reverse | forward
Whether transformation should be from group space to subject space
(reverse) or the other direction (forward).
Returns
-------
out_vals : array or Series
Scalar value map defined on new surface.
"""
data_dir = PROJECT["data_dir"]
sphere_reg_fname = op.join(data_dir, subj, "surf", hemi + ".sphere.reg")
avg_sphere_fname = op.join(data_dir, "fsaverage/surf", hemi + ".sphere")
sphere_reg, _ = nib.freesurfer.read_geometry(sphere_reg_fname)
avg_sphere, _ = nib.freesurfer.read_geometry(avg_sphere_fname)
if direction.startswith("f"):
src_sphere, trg_sphere = sphere_reg, avg_sphere
elif direction.startswith("r"):
src_sphere, trg_sphere = avg_sphere, sphere_reg
interpolator = interpolate.NearestNDInterpolator(src_sphere, vals)
out_vals = interpolator(trg_sphere)
if isinstance(vals, pd.Series):
out_vals = pd.Series(out_vals)
return out_vals
def fill_missing(mask, x, y, u, v):
"""Fill missing value with by interpolating the values from nearest
neighbours"""
# Construct the interpolators from the boundary values surrounding the
# missing values.
boundaries = find_boundaries(u.mask, mode='outer')
points = np.stack((y[boundaries], x[boundaries])).T
ip_u = NearestNDInterpolator(points, u[boundaries], rescale=False)
ip_v = NearestNDInterpolator(points, v[boundaries], rescale=False)
# Interpolate only missing values.
missing = (y[mask], x[mask])
u[mask] = ip_u(missing)
v[mask] = ip_v(missing)
def __init__(self, data, categorical_parameters, continuous_parameters, func=None, order=1):
self.key_columns = categorical_parameters
self.parameter_columns = continuous_parameters
self.func = func
if len(self.parameter_columns) not in [1, 2]:
raise ValueError("Only interpolation over 1 or 2 variables is supported")
if len(self.parameter_columns) == 1 and order == 0:
raise ValueError("Order 0 only supported for 2d interpolation")
# These are the columns which the interpolation function will approximate
value_columns = sorted(data.columns.difference(set(self.key_columns)|set(self.parameter_columns)))
if self.key_columns:
# Since there are key_columns we need to group the table by those
# columns to get the sub-tables to fit
sub_tables = data.groupby(self.key_columns)
else:
# There are no key columns so we will fit the whole table
sub_tables = {None: data}.items()
self.interpolations = {}
for key, base_table in sub_tables:
if base_table.empty:
continue
# For each permutation of the key columns build interpolations
self.interpolations[key] = {}
for value_column in value_columns:
# For each value in the table build an interpolation function
if len(self.parameter_columns) == 2:
# 2 variable interpolation
if order == 0:
x = base_table[list(self.parameter_columns)]
y = base_table[value_column]
func = interpolate.NearestNDInterpolator(x=x.values, y=y.values)
else:
index, column = self.parameter_columns
table = base_table.pivot(index=index, columns=column, values=value_column)
x = table.index.values
y = table.columns.values
z = table.values
func = interpolate.RectBivariateSpline(x=x, y=y, z=z, ky=order, kx=order).ev
else:
# 1 variable interpolation
base_table = base_table.sort_values(by=self.parameter_columns[0])
x = base_table[self.parameter_columns[0]]
y = base_table[value_column]
func = interpolate.InterpolatedUnivariateSpline(x, y, k=order)
self.interpolations[key][value_column] = func