def CalcBufferGridPoints(x_buffer, y_buffer, polygon_buffer, spacing_grid):
"""Finds the grid points for our buffer around the fault."""
x_fill_vec = np.arange(np.min(x_buffer), np.max(x_buffer), spacing_grid)
y_fill_vec = np.arange(np.min(y_buffer), np.max(y_buffer), spacing_grid)
x_buffer_fill_grid, y_buffer_fill_grid = np.meshgrid(x_fill_vec, y_fill_vec)
# Select only those grid points inside of buffered region.
x_buffer_fill_grid = x_buffer_fill_grid.flatten()
y_buffer_fill_grid = y_buffer_fill_grid.flatten()
indices_to_delete = []
for i in range(len(x_buffer_fill_grid)):
candidate = Point(x_buffer_fill_grid[i], y_buffer_fill_grid[i])
if not polygon_buffer.contains(candidate):
indices_to_delete.append(i)
x_buffer_fill_grid = np.delete(x_buffer_fill_grid, indices_to_delete)
y_buffer_fill_grid = np.delete(y_buffer_fill_grid, indices_to_delete)
return (x_buffer_fill_grid, y_buffer_fill_grid)
python类Point()的实例源码
def split_line(line, max_line_units):
"""Checks the line's length and splits in half if larger than the configured max
:param line: Shapely line to be split
:param max_line_units: The maximum allowed length of the line
"""
if line.length <= max_line_units:
return [line]
half_length = line.length / 2
coords = list(line.coords)
for idx, point in enumerate(coords):
proj_dist = line.project(Point(point))
if proj_dist == half_length:
return [LineString(coords[:idx + 1]), LineString(coords[idx:])]
if proj_dist > half_length:
mid_point = line.interpolate(half_length)
head_line = LineString(coords[:idx] + [(mid_point.x, mid_point.y)])
tail_line = LineString([(mid_point.x, mid_point.y)] + coords[idx:])
return split_line(head_line, max_line_units) + split_line(tail_line, max_line_units)
def is_point_on_land(coords, shape_file=None):
"""Checks if a coords are over land or over water. This is done useing
a shape file of world boundaries and looping over all Polygons to see
if the point is in any of it.
"""
import fiona
from shapely.geometry import Point, shape
if not shape_file:
shape_file='/home/data/mapdata/other/tm_world/TM_WORLD_BORDERS-0.3.shp'
lon, lat=coords
pt=Point(lon, lat)
try:
fc=fiona.open(shape_file)
except:
pass
result=False
for feature in fc:
if shape(feature['geometry']).contains(pt):
result=True
fc.close()
return result
def plot_lines(lines, **kwargs):
import matplotlib
def plot_line(ob):
x, y = ob.xy
matplotlib.pyplot.plot(x, y, linewidth=1, solid_capstyle='round',
zorder=1, **kwargs)
for u in lines:
if u.geom_type in ['LineString', 'LinearRing', 'Point']:
plot_line(u)
elif u.geom_type is 'MultiLineString':
for p in u:
plot_line(p)
def line_contains_point(line, point, buf=10e-5):
p = Point(point).buffer(buf)
return line.intersects(p)
def _register_node(self, p):
if isinstance(p, Point):
p = list(p.coords)[0]
elif isinstance(p, tuple):
pass
else:
raise TypeError('The point should be shapely::Point or a 2-tuple')
if p not in self.node_ids:
nid = self.nodes_counter
self.node_ids[p] = nid
self.node_xy[nid] = p
self.nodes_counter += 1
return self.node_ids[p]
def point_projects_to_edges(self, point, distance_tolerance=0.01):
"""
Project a point to graph edges considering specific distance tolerance.
Note the tolerance is measured by great circle distance to point per se.
:param point: a shapely Point instance or (lon, lat) tuple
:param distance_tolerance: tolerance of distance in km
:return: a list of projected edges, reversely sorted by offsets.
"""
point_buffer = distance_to_buffer(distance_tolerance)
p_buf = Point(point).buffer(point_buffer)
projected_edges = []
projected_segments = []
major = self.major_component()
for i in range(0, len(major)):
line_index = major[i]
line = self.geoms[line_index]
if line.intersects(p_buf):
cuts = self.line_cuts(line_index)
if cuts is None:
continue
for j in range(1, len(cuts)):
sinx = cuts[j - 1]
einx = cuts[j]
segment = line.coords[sinx:einx + 1]
ls = LineString(segment)
if ls.intersects(p_buf):
edge = self.edge_key(segment[0], segment[-1])
offset = ls.distance(Point(point)) # no buffer
projected_edges.append((edge, offset))
projected_segments.append(segment)
result = sorted(projected_edges, key=lambda x: x[1], reverse=True)
edges = list(set([i[0] for i in result]))
return edges, projected_segments
def test_costdistance_finite(self):
def zero_one(kv):
k = kv[0]
return (k.col == 0 and k.row == 1)
result = cost_distance(self.raster_rdd,
geometries=[Point(13, 13)], max_distance=144000.0)
tile = result.to_numpy_rdd().filter(zero_one).first()[1]
point_distance = tile.cells[0][1][3]
self.assertEqual(point_distance, 0.0)
def test_costdistance_finite_int(self):
def zero_one(kv):
k = kv[0]
return (k.col == 0 and k.row == 1)
result = cost_distance(self.raster_rdd,
geometries=[Point(13, 13)], max_distance=144000)
tile = result.to_numpy_rdd().filter(zero_one).first()[1]
point_distance = tile.cells[0][1][3]
self.assertEqual(point_distance, 0.0)
def test_costdistance_infinite(self):
def zero_one(kv):
k = kv[0]
return (k.col == 0 and k.row == 1)
result = cost_distance(self.raster_rdd,
geometries=[Point(13, 13)], max_distance=float('inf'))
tile = result.to_numpy_rdd().filter(zero_one).first()[1]
point_distance = tile.cells[0][0][0]
self.assertTrue(point_distance > 1250000)
def test_euclideandistance(self):
def mapTransform(layoutDefinition, spatialKey):
ex = layoutDefinition.extent
x_range = ex.xmax - ex.xmin
xinc = x_range/layoutDefinition.tileLayout.layoutCols
yrange = ex.ymax - ex.ymin
yinc = yrange/layoutDefinition.tileLayout.layoutRows
return {'xmin': ex.xmin + xinc * spatialKey['col'],
'xmax': ex.xmin + xinc * (spatialKey['col'] + 1),
'ymin': ex.ymax - yinc * (spatialKey['row'] + 1),
'ymax': ex.ymax - yinc * spatialKey['row']}
def gridToMap(layoutDefinition, spatialKey, px, py):
ex = mapTransform(layoutDefinition, spatialKey)
x_range = ex['xmax'] - ex['xmin']
xinc = x_range/layoutDefinition.tileLayout.tileCols
yrange = ex['ymax'] - ex['ymin']
yinc = yrange/layoutDefinition.tileLayout.tileRows
return (ex['xmin'] + xinc * (px + 0.5), ex['ymax'] - yinc * (py + 0.5))
def distanceToGeom(layoutDefinition, spatialKey, geom, px, py):
x, y = gridToMap(layoutDefinition, spatialKey, px, py)
return geom.distance(Point(x, y))
tiled = euclidean_distance(self.pts_wm, 3857, 7)
result = tiled.stitch().cells[0]
arr = np.zeros((256,256), dtype=float)
it = np.nditer(arr, flags=['multi_index'])
while not it.finished:
py, px = it.multi_index
arr[py][px] = distanceToGeom(tiled.layer_metadata.layout_definition,
{'col': 64, 'row':63},
self.pts_wm,
px,
py)
it.iternext()
self.assertTrue(np.all(abs(result - arr) < 1e-8))
def in_us(lat, lon):
p = Point(lon, lat)
for state, poly in state_polygons.iteritems():
if poly.contains(p):
return state
return None
def get_state_from_coordinates(coordnates):
#coordinates = np.array([(34, -118), (40.7, -74)])
sf = shapefile.Reader('./data/us_states_st99/st99_d00')
#sf = shapefile.Reader("./data/states/cb_2015_us_state_20m")
shapes = sf.shapes()
#shapes[i].points
fields = sf.fields
records = sf.records()
state_polygons = defaultdict(list)
for i, record in enumerate(records):
state = record[5]
points = shapes[i].points
poly = shape(shapes[i])
state_polygons[state].append(poly)
coor_state = {}
for i in range(coordnates.shape[0]):
lat, lon = coordnates[i]
for state, polies in state_polygons.iteritems():
for poly in polies:
point = Point(lon, lat)
if poly.contains(point):
coor_state[(lat, lon)] = state.lower()
return coor_state
def in_us(lat, lon):
p = Point(lon, lat)
for state, poly in state_polygons.iteritems():
if poly.contains(p):
return state
return None
def bbox_for_track(track):
"""
Get the bounding box for the track.
:param MultiPoint|Point track:
:return:
"""
return BBox(track.bounds[1], track.bounds[0], track.bounds[3], track.bounds[2])
def __mk_track(track_points):
if len(track_points) > 1:
return MultiPoint(track_points)
elif len(track_points) == 1:
return Point(track_points[0])
else:
__LOG.critical(u'Selected GPX have no valid track points')
print u'Selected GPX have no valid track points'
sys.exit(404)
def load_gpx(filename):
"""
:param filename: The file to load the track for.
:return [Point|MultiPoint], BBox:
The point or line read from the GPX track file.
And the bounding box as a 4-tuple.
"""
__LOG.debug(u'Opening GPX file: %s' % filename)
try:
with open(filename, 'r') as gpx_file:
tree = ElementTree.parse(gpx_file)
root = tree.getroot()
except IOError as e:
__LOG.error(u'Failed to read %s: %s' % (filename, e.message))
raise e
tracks = []
for trk in root.findall('{http://www.topografix.com/GPX/1/1}trk'):
for seg in trk.findall('{http://www.topografix.com/GPX/1/1}trkseg'):
track_points = []
for point in seg.findall('{http://www.topografix.com/GPX/1/1}trkpt'):
trk_pt = ([float(point.get('lon')), float(point.get('lat'))])
track_points.append(trk_pt)
tracks.append(__mk_track(track_points))
for trk in root.findall('{http://www.topografix.com/GPX/1/0}trk'):
for seg in trk.findall('{http://www.topografix.com/GPX/1/0}trkseg'):
track_points = []
for point in seg.findall('{http://www.topografix.com/GPX/1/0}trkpt'):
trk_pt = ([float(point.get('lon')), float(point.get('lat'))])
track_points.append(trk_pt)
tracks.append(__mk_track(track_points))
return tracks
def store_kml(obj, obj_id, admin_level, name=u'unknown'):
"""
Store a shapely geometry object as a KML file.
:param BaseGeometry obj: The object to store
:param int obj_id: The object ID of region.
:param int admin_level: Admin level of region [default=0]
:param str|unicode name: Name of the region to store in KML.
:return:
"""
ascii_name = gpx_utils.enforce_unicode(name).encode('ascii', 'replace')
filename = './%s_%s.kml' % (ascii_name.replace(' ', '_'), obj_id)
__LOG.info(u'store_kml: storing a %s with size: %s ', obj.geom_type, obj.area)
ns = '{http://www.opengis.net/kml/2.2}'
sls = styles.LineStyle(color='ffff0000')
sas = styles.PolyStyle(color='5500ff00')
sps = styles.BalloonStyle(bgColor='ff0000ff')
style = styles.Style(styles=[sls, sas, sps])
kf = kml.KML(ns)
if obj.geom_type == 'LineString' or obj.geom_type == 'MultiLineString' or obj.geom_type == 'LinearRing':
d = kml.Document(ns, str(obj_id), 'Traces', 'GPX Visualization')
elif obj.geom_type == 'Polygon' or obj.geom_type == 'MultiPolygon':
d = kml.Document(ns, str(obj_id), 'Border of {0} ({1})'.format(ascii_name, obj_id), 'Border visualization')
else:
d = kml.Document(ns, str(obj_id), 'Points', 'Point visualization')
kf.append(d)
p = kml.Placemark(ns, str(obj_id), ascii_name, '{0}'.format(ascii_name), styles=[style])
p.geometry = obj
d.append(p)
fil = open(filename, 'w')
fil.write(kf.to_string(prettyprint=True))
fil.flush()
fil.close()
__LOG.debug(u'store_kml: store successful (%s/%s) -> %s' % (admin_level, obj_id, filename))
def points_to_shapely_polygon(sets_of_points):
# sets of points are lists using str(z) as keys
# each item is an ordered list of points representing a polygon, each point is a 3-item list [x, y, z]
# polygon n is inside polygon n-1, then the current accumulated polygon is
# polygon n subtracted from the accumulated polygon up to and including polygon n-1
# Same method DICOM uses to handle rings and islands
composite_polygon = []
for set_of_points in sets_of_points:
if len(set_of_points) > 3:
points = [(point[0], point[1]) for point in set_of_points]
points.append(points[0]) # Explicitly connect the final point to the first
# if there are multiple sets of points in a slice, each set is a polygon,
# interior polygons are subtractions, exterior are addition
# Only need to check one point for interior vs exterior
current_polygon = Polygon(points).buffer(0) # clean stray points
if composite_polygon:
if Point((points[0][0], points[0][1])).disjoint(composite_polygon):
composite_polygon = composite_polygon.union(current_polygon)
else:
composite_polygon = composite_polygon.symmetric_difference(current_polygon)
else:
composite_polygon = current_polygon
return composite_polygon
def geom(self):
return ShapelyPoint(self.co)