def test_intersection_com_mock_2(self):
ls = LineString([(1, 1, 9.48024060e+08), (2, 2, 9.49363260e+08),
(3, 1, 9.51868860e+08)])
poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
self.traj2.intersection_shapely = MagicMock(return_value=ls)
response = self.traj2.intersection_shapely(poly)
ls = np.array(ls)
trajMock = self.traj2.to_Trajectory(response)
traj = Trajectory(ls[:, 0], ls[:, 1], ls[:, 2])
assert (np.array_equal(trajMock.getX(), traj.getX()))
assert (np.array_equal(trajMock.getY(), traj.getY()))
assert (np.array_equal(trajMock.getTime(), traj.getTime()))
python类LineString()的实例源码
def subgraph_within_box(self, bounding_box):
"""
Extract a subgraph bounded by a box.
:param bounding_box: the bounding coordinates in
(minx, miny, maxx, maxy) or a Polygon instance
:return: a subgraph of nx.Graph
"""
if isinstance(bounding_box, Polygon):
bbox = bounding_box
else:
bbox = box(bounding_box[0], bounding_box[1],
bounding_box[2], bounding_box[3])
nbunch = set()
for edge in self.graph.edges():
s, e = edge
if bbox.intersects(LineString([self.node_xy[s], self.node_xy[e]])):
nbunch.add(s)
nbunch.add(e)
return self.graph.subgraph(nbunch)
def substring_3d(linestring, from_, to_):
"the linestring a shapely geometry, from_ and to_ are in length units"
tot_len = 0
sq = lambda x: x*x
def interpolate(a, b, ratio):
return (a[0] + ratio*(b[0] - a[0]), a[1] + ratio*(b[1] - a[1]), a[2] + ratio*(b[2] - a[2]))
res = []
for s, e in zip(linestring.coords[0:-1], linestring.coords[1:]):
length = sqrt(sq(s[0]-e[0]) + sq(s[1]-e[1]) + sq(s[2]-e[2]))
tot_len += length
if tot_len > from_:
if not len(res):
#interpolate first
res.append(interpolate(e, s, (tot_len - from_)/length))
if tot_len >= to_:
#interpolate last
res.append(interpolate(e, s, (tot_len - to_)/length))
break
res.append(e)
return LineString(res)
def compute_segment_geometry(feature1_wkt, feature2_wkt):
""" Returns a geometry (LineString) connecting features centers """
def mysum(x, a, b):
return [(a[i] + b[i]) * x for i in range(0, 3)]
def bary(coords):
return reduce(lambda x, y: mysum(1.0 / len(coords), x, y), coords)
geom1 = loads(feature1_wkt)
centroid1_with_z = bary(geom1.coords)
geomB = loads(feature2_wkt)
centroid2_with_z = bary(geomB.coords)
result = LineString([centroid1_with_z, centroid2_with_z])
if not result.is_valid:
raise Exception(
'Cannot compute segment geometry connecting {} and {}'.format(
feature1_wkt, feature2_wkt))
return result
def create_frames(mapfile):
# increment in steps of 3 units from 1 to 35
min_buffer = 1
max_buffer = 35
step = 3
# create a line
line = LineString([(0, 0), (100, 100), (0, 200), (200, 200), (300, 100), (100, 0)])
# set the map extent to this line
mapfile["extent"] = " ".join(map(str, line.buffer(max_buffer*2).bounds))
all_images = []
# create expanding buffers
for dist in range(min_buffer, max_buffer, step):
all_images.append(create_frame(mapfile, line, dist))
# create shrinking buffers
for dist in range(max_buffer, min_buffer, -step):
all_images.append(create_frame(mapfile, line, dist))
return all_images
def generate_polygon(points):
from shapely.geometry import Polygon, Point, LineString
p = []
p.append(points[0][1])
for point in points:
p.append(point[0])
p.append(points[-1][1])
for point in points[::-1]:
p.append(point[2])
eclipse_boundary = Polygon(p)
p = []
for point in points:
p.append(point[1])
center_line = LineString(p)
return eclipse_boundary, center_line
def generate_polygon(points):
from shapely.geometry import Polygon, Point, LineString
p = []
p.append(points[0][1])
for point in points:
p.append(point[0])
p.append(points[-1][1])
for point in points[::-1]:
p.append(point[2])
eclipse_boundary = Polygon(p)
p = []
for point in points:
p.append(point[1])
center_line = LineString(p)
return eclipse_boundary, center_line
def _geometry_to_svg(self, geom):
# scale and move geometry and create svg code for it
if isinstance(geom, Polygon):
return ('<path d="' +
' '.join((('M %.1f %.1f L'+(' %.1f %.1f'*(len(ring.coords)-1))+' z') %
tuple((np.array(ring)*self.np_scale+self.np_offset).flatten()))
for ring in chain((geom.exterior,), geom.interiors))
+ '"/>').replace('.0 ', ' ')
if isinstance(geom, LineString):
return (('<path d="M %.1f %.1f L'+(' %.1f %.1f'*(len(geom.coords)-1))+'"/>') %
tuple((np.array(geom)*self.np_scale+self.np_offset).flatten())).replace('.0 ', ' ')
try:
geoms = geom.geoms
except AttributeError:
return ''
return ''.join(self._geometry_to_svg(g) for g in geoms)
def nodes_for_point(self, point, all_nodes):
point = Point(point.x, point.y)
nodes = {}
if self.nodes:
for node in self.nodes:
node = all_nodes[node]
line = LineString([(node.x, node.y), (point.x, point.y)])
if line.length < 5 and not self.clear_geometry_prep.intersects(line):
nodes[node.i] = (None, None)
if not nodes:
nearest_node = min(tuple(all_nodes[node] for node in self.nodes),
key=lambda node: point.distance(node.point))
nodes[nearest_node.i] = (None, None)
else:
nodes = self.fallback_nodes
return nodes
def _geom_to_array(geom: BaseGeometry):
if isinstance(geom, geometry.Point):
yield np.array([(np.nan, GeomTypes.POINT)])
yield np.asarray(geom.coords)
elif isinstance(geom, geometry.LineString):
yield np.array([(np.nan, GeomTypes.LINESTRING)])
yield np.asarray(geom.coords)
elif isinstance(geom, geometry.Polygon):
for interior in geom.interiors:
yield np.array([(np.nan, GeomTypes.POLYGON_HOLE)])
yield np.asarray(interior)
yield np.array([(np.nan, GeomTypes.POLYGON_SHELL)])
yield np.asarray(geom.exterior)
elif isinstance(geom, BaseMultipartGeometry):
return chain.from_iterable(map(geom_to_array, geom))
else:
raise TypeError
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
height = self.height
width = self.width
domain = Polygon([(0, 0), (0, height), (width, height), (width, 0)])
target = LineString([(0, height / 2), (0, height)])
spawn = Polygon([(0, 0),
(0, height / 2),
(width / 2, height / 2),
(width / 2, 0)])
obstacles = LineString([(0, height / 2),
(width * self.ratio, height / 2)]) | \
domain.exterior - target
self.obstacles = obstacles
self.targets = [target]
self.spawns = [spawn]
self.domain = domain
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
width = self.width
height = self.height
exit_width = self.exit_width
exit_hall_width = self.exit_hall_width
self.room = LineString(
[(width, 0), (0, 0), (0, height), (width, height)])
self.hall = rectangle(width, (height + exit_width) / 2,
exit_hall_width, (height - exit_width) / 2) | \
rectangle(width, 0,
exit_hall_width, (height - exit_width) / 2)
target = LineString(
[(width + exit_hall_width, (height - exit_width) / 2),
(width + exit_hall_width, (height + exit_width) / 2)])
# Field attributes
self.obstacles = self.room | self.hall
self.targets = [target]
self.spawns = [self.room.convex_hull]
self.domain = self.convex_hull()
def add_geom(fig: Figure, geom: BaseGeometry, **kwargs):
"""Add Shapely geom into Bokeh plot.
Args:
fig (Figure):
geom (BaseGeometry):
"""
if isinstance(geom, Point):
fig.circle(*geom.xy, **kwargs)
elif isinstance(geom, LineString):
fig.line(*geom.xy, **kwargs)
elif isinstance(geom, Polygon):
fig.patch(*geom.exterior.xy, **kwargs)
elif isinstance(geom, BaseMultipartGeometry):
for item in geom:
add_geom(fig, item, **kwargs)
else:
raise TypeError('Object geom {geom} no instance of {types}.'.format(
geom=geom, types=BaseGeometry))
def test_linearize():
t_start = 1
t_stop = 6
xy = np.array(([1., 1.], [2., 2.], [3., 3.], [4., 4.], [5., 5.]))
time = np.array([0., 1., 2., 3., 4.])
pos = nept.Position(xy, time)
trajectory = [[0., 0.], [5., 5.], [10., 10.]]
line = LineString(trajectory)
zone_start = Point([1., 1.])
zone_stop = Point([9., 9.])
expand_by = 1
zone = nept.expand_line(zone_start, zone_stop, line, expand_by)
sliced_pos = pos[t_start:t_stop]
linear = sliced_pos.linearize(line, zone)
assert np.allclose(linear.x, np.array([2.82842712, 4.24264069, 5.65685425, 7.07106781]))
assert np.allclose(linear.time, [1, 2, 3, 4])
def test_position_linearize():
times = np.array([1.0, 2.0, 3.0])
data = np.array([[0.0, 0.5],
[0.5, 0.1],
[1.0, 1.2]])
pos = nept.Position(data, times)
line = LineString([(0.0, 0.0), (1.0, 1.0)])
zone_start = Point([1., 1.])
zone_stop = Point([9., 9.])
expand_by = 1
zone = nept.expand_line(zone_start, zone_stop, line, expand_by)
linear = pos.linearize(line, zone)
assert np.allclose(linear.x, np.array([0.35355339, 0.42426407, 1.41421356]))
assert np.allclose(linear.time, np.array([1., 2., 3.]))
def split(line, point):
distance_on_line = line.project(point)
coords = list(line.coords)
for i, p in enumerate(coords):
pd = line.project(geometry.Point(p))
if pd == distance_on_line:
return [
geometry.LineString(coords[:i+1]),
geometry.LineString(coords[i:])
]
elif distance_on_line < pd:
cp = line.interpolate(distance_on_line)
ls1_coords = coords[:i]
ls1_coords.append(cp.coords[0])
ls2_coords = [cp.coords[0]]
ls2_coords.extend(coords[i:])
return [geometry.LineString(ls1_coords),
geometry.LineString(ls2_coords)]
def generate_breaklines(self):
segments = {}
for i, edge in self.edge_values.iteritems():
segments[i] = {"geometry": LineString([(self.raw_indices[edge[0]].x, self.raw_indices[edge[0]].y, self.raw_indices[edge[0]].z),
(self.raw_indices[edge[1]].x, self.raw_indices[edge[1]].y, self.raw_indices[edge[1]].z,)]),
"linetype": "HARD" if edge[2] == 4 else "SOFT"}
# from shapely.ops import linemerge
#
# outsegs = {}
# i2 = 0
# for ltype in ['HARD', 'SOFT']:
# i2 = i2 + 1
# segs = [s['geometry'] for s in segments.itervalues() if s['linetype'] == ltype]
# for line in list(linemerge(segs)):
# outsegs[i2] = {'geometry':line, 'linetype': ltype}
return segments
def points_along_boundaries(geoseries, distance=1.0):
"""
Generate a shapely MultiPoint of point features along lines at a specified distance
:param geoseries:
:param distance:
:return:
"""
list_points = []
for line3d in iter_line(geoseries):
line = LineString([xy[0:2] for xy in list(line3d.coords)])
current_dist = distance
line_length = line.length
list_points.append(Point(list(line.coords)[0]))
while current_dist < line_length:
list_points.append(line.interpolate(current_dist))
current_dist += distance
list_points.append(Point(list(line.coords)[-1]))
return MultiPoint(list_points)
def split_line(line, max_line_units):
"""Checks the line's length and splits in half if larger than the configured max
:param line: Shapely line to be split
:param max_line_units: The maximum allowed length of the line
"""
if line.length <= max_line_units:
return [line]
half_length = line.length / 2
coords = list(line.coords)
for idx, point in enumerate(coords):
proj_dist = line.project(Point(point))
if proj_dist == half_length:
return [LineString(coords[:idx + 1]), LineString(coords[idx:])]
if proj_dist > half_length:
mid_point = line.interpolate(half_length)
head_line = LineString(coords[:idx] + [(mid_point.x, mid_point.y)])
tail_line = LineString([(mid_point.x, mid_point.y)] + coords[idx:])
return split_line(head_line, max_line_units) + split_line(tail_line, max_line_units)
def point_projects_to_edges(self, point, distance_tolerance=0.01):
"""
Project a point to graph edges considering specific distance tolerance.
Note the tolerance is measured by great circle distance to point per se.
:param point: a shapely Point instance or (lon, lat) tuple
:param distance_tolerance: tolerance of distance in km
:return: a list of projected edges, reversely sorted by offsets.
"""
point_buffer = distance_to_buffer(distance_tolerance)
p_buf = Point(point).buffer(point_buffer)
projected_edges = []
projected_segments = []
major = self.major_component()
for i in range(0, len(major)):
line_index = major[i]
line = self.geoms[line_index]
if line.intersects(p_buf):
cuts = self.line_cuts(line_index)
if cuts is None:
continue
for j in range(1, len(cuts)):
sinx = cuts[j - 1]
einx = cuts[j]
segment = line.coords[sinx:einx + 1]
ls = LineString(segment)
if ls.intersects(p_buf):
edge = self.edge_key(segment[0], segment[-1])
offset = ls.distance(Point(point)) # no buffer
projected_edges.append((edge, offset))
projected_segments.append(segment)
result = sorted(projected_edges, key=lambda x: x[1], reverse=True)
edges = list(set([i[0] for i in result]))
return edges, projected_segments
def intersection_shapely(self,geom):
traj = LineString(np.column_stack((self.x[:, np.newaxis], self.y[:, np.newaxis], self.seconds[:, np.newaxis])))
return traj.intersection(geom)
def boundary_shapely(self):
traj = LineString(np.column_stack((self.x[:, np.newaxis], self.y[:, np.newaxis], self.seconds[:, np.newaxis])))
return traj.boundary
def difference(self, geom):
traj = LineString(np.column_stack((self.x[:, np.newaxis], self.y[:, np.newaxis], self.seconds[:, np.newaxis])))
return traj.difference(geom)
def test_intersection_beirada(self):
polygon = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
line = LineString([(1, 0, 1), (2, 2, 2), (3, 2, 3), (5, 4, 4)])
inter = line.intersection(polygon)
resultRight = LineString([(1.5, 1, 1.5), (2, 2, 2), (3, 2, 3), (4, 3, 3.5)])
assert (resultRight == inter)
def test_intersection_com_beirada_superior_e_inferior_esquerdo(self):
polygon = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
line = LineString([(0, 0, 1), (2, 2, 2), (0, 4, 3)])
result = LineString([(1, 1, 1.5), (2, 2, 2), (1, 3, 2.5)])
inter = line.intersection(polygon)
assert (result == inter)
def test_intersection_sem_beirada_superior_e_inferior_esquerdo(self):
polygon = Polygon([(0.9, 1), (0.9, 2), (0.9, 3), (4, 3), (4, 1), (0.9, 1)])
line = LineString([(0, 0, 1), (2, 2, 2), (0, 4, 3)])
result = LineString([(1, 1, 1.5), (2, 2, 2), (1, 3, 2.5)])
inter = line.intersection(polygon)
assert (result == inter)
def test_intersection_quando_se_encontra_em_um_ponto_qualquer(self):
polygon = Polygon([(1, 1), (1, 2), (1, 3), (4, 3), (4, 1), (1, 1)])
line = LineString([(0, 0, 1), (2, 2, 2), (0, 2, 3)])
result = LineString([(1, 1, 1.5), (2, 2, 2), (1, 2, 2.5)])
inter = line.intersection(polygon)
assert (result == inter)
def test_intersection_verificar_se_interpolou_e_nao_pegou_o_mais_proximo(self):
polygon = Polygon([(1, 1), (1, 2), (1, 3), (4, 3), (4, 1), (1, 1)])
line = LineString([(0, 0, 1), (2, 2, 2), (0, 2, 3)])
inter = line.intersection(polygon)
time = np.array(inter)[:,2]
assert (time[0] != time[1])
assert (time[1] != time[2])
def test_intersection_verificar_se_interpolacao_retorna_o_valor_correto(self):
line = LineString([(0, 0, 1), (2, 2, 2), (0, 2, 3)])
polygon = Polygon([(1, 1), (1, 2), (1, 3), (4, 3), (4, 1), (1, 1)])
result = LineString([(1, 1, 1.5), (2, 2, 2), (1, 2, 2.5)])
inter = line.intersection(polygon)
resultTime = np.array(result)[:, 2]
time = np.array(inter)[:, 2]
assert (time[0] == resultTime[0])
assert (time[1] == resultTime[1])
assert (time[2] == resultTime[2])
def getPointlist(self):
"""Create a list of points with absolute positions and create a LineString instance. """
self.pointlist = []
for curve in self.curves:
self.pointlist.append(curve.start)
self.pointlist.append(curve.control1)
self.pointlist.append(curve.control2)
self.pointlist.append(curve.end)
self.linestring = geo.LineString(self.pointlist)