def testRuleBasedDispatcherNoLabels(self):
# prepare data for test
box1 = box(0, 0, 100, 100)
box2 = box(0, 0, 10, 10)
dispatcher = RuleBasedDispatcher([CatchAllRule()])
self.assertEqual(dispatcher.dispatch(None, box1), 0)
dispatch_batch = dispatcher.dispatch_batch(None, [box1, box2])
assert_array_equal(dispatch_batch, [0, 0])
labels, dispatch_map = dispatcher.dispatch_map(None, [box1, box2])
assert_array_equal(labels, dispatch_batch)
assert_array_equal(dispatch_batch, dispatch_map)
python类box()的实例源码
def testRuleBasedDispatcher(self):
# prepare data for test
box1 = box(0, 0, 100, 100)
box2 = box(0, 0, 10, 10)
dispatcher = RuleBasedDispatcher([CatchAllRule()], ["catchall"])
self.assertEqual(dispatcher.dispatch(None, box1), "catchall")
dispatch_batch = dispatcher.dispatch_batch(None, [box1, box2])
assert_array_equal(dispatch_batch, ["catchall", "catchall"])
labels, dispatch_map = dispatcher.dispatch_map(None, [box1, box2])
assert_array_equal(labels, dispatch_batch)
assert_array_equal(dispatch_map, [0, 0])
def testCustomDispatcher(self):
# prepare data for test
box1 = box(0, 0, 500, 500)
box2 = box(0, 0, 10, 10)
box3 = box(0, 0, 1000, 1000)
dispatcher = CustomDispatcher()
self.assertEqual(dispatcher.dispatch(None, box1), "BIG")
dispatch_batch = dispatcher.dispatch_batch(None, [box1, box2, box3])
assert_array_equal(dispatch_batch, ["BIG", "SMALL", "BIG"])
labels, dispatch_map = dispatcher.dispatch_map(None, [box1, box2, box3])
assert_array_equal(labels, dispatch_batch)
assert_array_equal(dispatch_map, [1, 0, 1])
def _init_polygon_mask(self, polygon_mask):
"""Sets the polygon mask taking into account parent and passed mask"""
parent_pmask = self._parent_polygon_mask(do_translate=True)
if polygon_mask is not None and parent_pmask is not None:
self._polygon_mask = polygon_mask.intersection(parent_pmask)
elif polygon_mask is not None:
self._polygon_mask = polygon_mask
elif parent_pmask is not None:
self._polygon_mask = box(0, 0, self.width, self.height).intersection(parent_pmask)
else:
self._polygon_mask = None
def __init__(self, min_x, min_y, max_x, max_y):
self.minx = min_x
self.miny = min_y
self.maxx = max_x
self.maxy = max_y
#The bounding boxes do NOT intersect if the other bounding box is
#entirely LEFT, BELOW, RIGHT, or ABOVE bounding box.
def TileId(self, y, x):
if (y < self.bbox.miny or x < self.bbox.minx or
y > self.bbox.maxy or x > self.bbox.maxx):
return -1
#Find the tileid by finding the latitude row and longitude column
return (self.Row(y) * self.ncolumns) + self.Col(x)
# Get the bounding box of the specified tile.
def BottomNeighbor(self, tileid):
return tileid if (tileid < self.ncolumns) else (tileid - self.ncolumns)
# Get the list of tiles that lie within the specified bounding box.
# The method finds the center tile and spirals out by finding neighbors
# and recursively checking if tile is inside and checking/adding
# neighboring tiles
def get_projects_geojson(search_bbox_dto: ProjectSearchBBoxDTO) -> geojson.FeatureCollection:
""" search for projects meeting criteria provided return as a geojson feature collection"""
# make a polygon from provided bounding box
polygon = ProjectSearchService._make_4326_polygon_from_bbox(search_bbox_dto.bbox, search_bbox_dto.input_srid)
# validate the bbox area is less than or equal to the max area allowed to prevent
# abuse of the api or performance issues from large requests
if not ProjectSearchService.validate_bbox_area(polygon):
raise BBoxTooBigError('Requested bounding box is too large')
# get projects intersecting the polygon for created by the author_id
intersecting_projects = ProjectSearchService._get_intersecting_projects(polygon, search_bbox_dto.project_author)
# allow an empty feature collection to be returned if no intersecting features found, since this is primarily
# for returning data to show on a map
features = []
for project in intersecting_projects:
try:
localDTO = ProjectInfo.get_dto_for_locale(project.id, search_bbox_dto.preferred_locale,
project.default_locale)
except Exception as e:
pass
properties = {
"projectId": project.id,
"projectStatus": ProjectStatus(project.status).name,
"projectName": localDTO.name
}
feature = geojson.Feature(geometry=geojson.loads(project.geometry), properties=properties)
features.append(feature)
return geojson.FeatureCollection(features)
def _make_4326_polygon_from_bbox(bbox: list, srid: int) -> Polygon:
""" make a shapely Polygon in SRID 4326 from bbox and srid"""
try:
polygon = box(bbox[0], bbox[1], bbox[2], bbox[3])
if not srid == 4326:
geometry = shape.from_shape(polygon, srid)
geom_4326 = db.engine.execute(ST_Transform(geometry, 4326)).scalar()
polygon = shape.to_shape(geom_4326)
except Exception as e:
raise ProjectSearchServiceError(f'error making polygon: {e}')
return polygon
def initialize_grid(self, geom):
"""
"""
bounds = geom.bounds
(minx, miny, maxx, maxy) = bounds
(minx, miny, maxx, maxy) = (
round(np.floor(minx * self.psi)) / self.psi,
round(np.floor(miny * self.psi)) / self.psi,
round(np.ceil(maxx * self.psi)) / self.psi,
round(np.ceil(maxy * self.psi)) / self.psi)
clean_bounds = (minx, miny, maxx, maxy)
top_left_lon = minx
top_left_lat = maxy
affine = Affine(self.pixel_size, 0, top_left_lon,
0, -self.pixel_size, top_left_lat)
# base_rasterize, base_bounds = self.rasterize_geom(geom)
# self.shape = base_rasterize.shape
nrows = int(np.ceil( (maxy - miny) / self.pixel_size ))
ncols = int(np.ceil( (maxx - minx) / self.pixel_size ))
self.shape = (nrows, ncols)
self.bounds = clean_bounds
self.affine = affine
self.topleft = (top_left_lon, top_left_lat)
self.grid_box = prep(box(*self.bounds))
# https://stackoverflow.com/questions/8090229/
# resize-with-averaging-or-rebin-a-numpy-2d-array/8090605#8090605
def _expand_bounds(self, bounds):
if bounds is None:
return bounds
min_tile_x, min_tile_y, max_tile_x, max_tile_y = self._tile_coords(bounds)
ul = box(*mercantile.xy_bounds(mercantile.Tile(z=self.zoom_level, x=min_tile_x, y=max_tile_y)))
lr = box(*mercantile.xy_bounds(mercantile.Tile(z=self.zoom_level, x=max_tile_x, y=min_tile_y)))
return ul.union(lr).bounds
def _tile_coords(self, bounds):
""" Convert tile coords mins/maxs to lng/lat bounds """
tfm = partial(pyproj.transform,
pyproj.Proj(init="epsg:3857"),
pyproj.Proj(init="epsg:4326"))
bounds = ops.transform(tfm, box(*bounds)).bounds
params = list(bounds) + [[self.zoom_level]]
tile_coords = [(tile.x, tile.y) for tile in mercantile.tiles(*params)]
xtiles, ytiles = zip(*tile_coords)
minx = min(xtiles)
maxx = max(xtiles)
miny = min(ytiles)
maxy = max(ytiles)
return minx, miny, maxx, maxy
def __new__(cls, access_token=os.environ.get("DG_MAPS_API_TOKEN"),
url="https://api.mapbox.com/v4/digitalglobe.nal0g75k/{z}/{x}/{y}.png",
zoom=22, **kwargs):
_tms_meta = TmsMeta(access_token=access_token, url=url, zoom=zoom, bounds=kwargs.get("bounds"))
self = super(TmsImage, cls).create(_tms_meta)
self._base_args = {"access_token": access_token, "url": url, "zoom": zoom}
self._tms_meta = _tms_meta
self.__geo_interface__ = mapping(box(*_tms_meta.bounds))
self.__geo_transform__ = _tms_meta.__geo_transform__
g = self._parse_geoms(**kwargs)
if g is not None:
return self[g]
else:
return self
def __getitem__(self, geometry):
if isinstance(geometry, BaseGeometry) or getattr(geometry, "__geo_interface__", None) is not None:
if self._tms_meta._bounds is None:
return self.aoi(geojson=mapping(geometry), from_proj=self.proj)
image = GeoImage.__getitem__(self, geometry)
image._tms_meta = self._tms_meta
return image
else:
result = super(TmsImage, self).__getitem__(geometry)
image = super(TmsImage, self.__class__).__new__(self.__class__,
result.dask, result.name, result.chunks,
result.dtype, result.shape)
if all([isinstance(e, slice) for e in geometry]) and len(geometry) == len(self.shape):
xmin, ymin, xmax, ymax = geometry[2].start, geometry[1].start, geometry[2].stop, geometry[1].stop
xmin = 0 if xmin is None else xmin
ymin = 0 if ymin is None else ymin
xmax = self.shape[2] if xmax is None else xmax
ymax = self.shape[1] if ymax is None else ymax
g = ops.transform(self.__geo_transform__.fwd, box(xmin, ymin, xmax, ymax))
image.__geo_interface__ = mapping(g)
image.__geo_transform__ = self.__geo_transform__ + (xmin, ymin)
else:
image.__geo_interface__ = self.__geo_interface__
image.__geo_transform__ = self.__geo_transform__
image._tms_meta = self._tms_meta
return image
def bounds(self):
""" Access the spatial bounding box of the image
Returns:
bounds (list): list of bounds in image projected coordinates (minx, miny, maxx, maxy)
"""
return shape(self).bounds
def _transpix(self, geometry, gsd, dem, proj):
xmin, ymin, xmax, ymax = geometry.bounds
x = np.linspace(xmin, xmax, num=int((xmax-xmin)/gsd))
y = np.linspace(ymax, ymin, num=int((ymax-ymin)/gsd))
xv, yv = np.meshgrid(x, y, indexing='xy')
if self.proj is None:
from_proj = "EPSG:4326"
else:
from_proj = self.proj
itfm = partial(pyproj.transform, pyproj.Proj(init=proj), pyproj.Proj(init=from_proj))
xv, yv = itfm(xv, yv) # if that works
if isinstance(dem, GeoImage):
g = box(xv.min(), yv.min(), xv.max(), yv.max())
try:
dem = dem[g].compute(get=dask.get) # read(quiet=True)
except AssertionError:
dem = 0 # guessing this is indexing by a 0 width geometry.
if isinstance(dem, np.ndarray):
dem = tf.resize(np.squeeze(dem), xv.shape, preserve_range=True, order=1, mode="edge")
return self.__geo_transform__.rev(xv, yv, z=dem, _type=np.float32)[::-1]
def __contains__(self, g):
geometry = ops.transform(self.__geo_transform__.rev, g)
img_bounds = box(0, 0, *self.shape[2:0:-1])
return img_bounds.contains(geometry)
def _image_by_type(cls, cat_id, **kwargs):
vectors = Vectors()
aoi = wkt.dumps(box(-180, -90, 180, 90))
query = "item_type:GBDXCatalogRecord AND attributes.catalogID:{}".format(cat_id)
query += " AND NOT item_type:IDAHOImage AND NOT item_type:DigitalGlobeAcquisition"
result = vectors.query(aoi, query=query, count=1)
if len(result) == 0:
raise Exception('Could not find a catalog entry for the given id: {}'.format(cat_id))
else:
return cls._image_class(cat_id, result[0], **kwargs)
def _build_standard_products(idaho_id, bbox, proj):
wkt = box(*bbox).wkt
dem = ipe.GeospatialCrop(ipe.IdahoRead(bucketName="idaho-dems", imageId=idaho_id, objectStore="S3"), geospatialWKT=str(wkt))
if proj is not "EPSG:4326":
dem = ipe.Reproject(dem, **reproject_params(proj))
return {
"dem": dem
}
def __getitem__(self, geometry):
if isinstance(geometry, BaseGeometry) or getattr(geometry, "__geo_interface__", None) is not None:
image = GeoImage.__getitem__(self, geometry)
image._ipe_op = self._ipe_op
return image
else:
result = super(IpeImage, self).__getitem__(geometry)
image = super(IpeImage, self.__class__).__new__(self.__class__,
result.dask, result.name, result.chunks,
result.dtype, result.shape)
if all([isinstance(e, slice) for e in geometry]) and len(geometry) == len(self.shape):
xmin, ymin, xmax, ymax = geometry[2].start, geometry[1].start, geometry[2].stop, geometry[1].stop
xmin = 0 if xmin is None else xmin
ymin = 0 if ymin is None else ymin
xmax = self.shape[2] if xmax is None else xmax
ymax = self.shape[1] if ymax is None else ymax
g = ops.transform(self.__geo_transform__.fwd, box(xmin, ymin, xmax, ymax))
image.__geo_interface__ = mapping(g)
image.__geo_transform__ = self.__geo_transform__ + (xmin, ymin)
else:
image.__geo_interface__ = self.__geo_interface__
image.__geo_transform__ = self.__geo_transform__
image._ipe_op = self._ipe_op
return image