def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
python类Expression()的实例源码
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return rhs, rhs_params
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for GeometryField')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def get_prep_value(self, value):
"""
Spatial lookup values are either a parameter that is (or may be
converted to) a geometry, or a sequence of lookup values that
begins with a geometry. This routine will setup the geometry
value properly, and preserve any other lookup parameters before
returning to the caller.
"""
value = super(GeometryField, self).get_prep_value(value)
if isinstance(value, Expression):
return value
elif isinstance(value, (tuple, list)):
geom = value[0]
seq_value = True
else:
geom = value
seq_value = False
# When the input is not a GEOS geometry, attempt to construct one
# from the given string input.
if isinstance(geom, Geometry):
pass
elif isinstance(geom, (bytes, six.string_types)) or hasattr(geom, '__geo_interface__'):
try:
geom = Geometry(geom)
except GeometryException:
raise ValueError('Could not create geometry from lookup value.')
else:
raise ValueError('Cannot use object with type %s for a geometry lookup parameter.' % type(geom).__name__)
# Assigning the SRID value.
geom.srid = self.get_srid(geom)
if seq_value:
lookup_val = [geom]
lookup_val.extend(value[1:])
return tuple(lookup_val)
else:
return geom
def get_db_prep_lookup(self, lookup_type, value, connection, prepared=False):
"""
Prepare for the database lookup, and return any spatial parameters
necessary for the query. This includes wrapping any geometry
parameters with a backend-specific adapter and formatting any distance
parameters into the correct units for the coordinate system of the
field.
"""
# special case for isnull lookup
if lookup_type == 'isnull':
return []
elif lookup_type in self.class_lookups:
# Populating the parameters list, and wrapping the Geometry
# with the Adapter of the spatial backend.
if isinstance(value, (tuple, list)):
params = [connection.ops.Adapter(value[0])]
if self.class_lookups[lookup_type].distance:
# Getting the distance parameter in the units of the field.
params += self.get_distance(value[1:], lookup_type, connection)
elif lookup_type in connection.ops.truncate_params:
# Lookup is one where SQL parameters aren't needed from the
# given lookup value.
pass
else:
params += value[1:]
elif isinstance(value, Expression):
params = []
else:
params = [connection.ops.Adapter(value)]
return params
else:
raise ValueError('%s is not a valid spatial lookup for %s.' %
(lookup_type, self.__class__.__name__))
def process_rhs(self, compiler, connection):
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return super(GISLookup, self).process_rhs(compiler, connection)
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for spatial fields.')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
# Check if a band index was passed in the query argument.
if ((len(self.rhs) == 2 and not self.lookup_name == 'relate') or
(len(self.rhs) == 3 and self.lookup_name == 'relate')):
self.process_band_indices()
elif len(self.rhs) > 2:
raise ValueError('Tuple too long for lookup %s.' % self.lookup_name)
elif isinstance(self.lhs, RasterBandTransform):
self.process_band_indices(only_lhs=True)
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
if isinstance(self.rhs, Query):
# If rhs is some Query, don't touch it.
return super(GISLookup, self).process_rhs(compiler, connection)
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for spatial fields.')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
# Check if a band index was passed in the query argument.
if ((len(self.rhs) == 2 and not self.lookup_name == 'relate') or
(len(self.rhs) == 3 and self.lookup_name == 'relate')):
self.process_band_indices()
elif len(self.rhs) > 2:
raise ValueError('Tuple too long for lookup %s.' % self.lookup_name)
elif isinstance(self.lhs, RasterBandTransform):
self.process_band_indices(only_lhs=True)
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
if isinstance(self.rhs, Query):
# If rhs is some Query, don't touch it.
return super(GISLookup, self).process_rhs(compiler, connection)
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for spatial fields.')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
# Check if a band index was passed in the query argument.
if ((len(self.rhs) == 2 and not self.lookup_name == 'relate') or
(len(self.rhs) == 3 and self.lookup_name == 'relate')):
self.process_band_indices()
elif len(self.rhs) > 2:
raise ValueError('Tuple too long for lookup %s.' % self.lookup_name)
elif isinstance(self.lhs, RasterBandTransform):
self.process_band_indices(only_lhs=True)
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def process_rhs(self, compiler, connection):
if hasattr(self.rhs, '_as_sql'):
# If rhs is some QuerySet, don't touch it
return super(GISLookup, self).process_rhs(compiler, connection)
geom = self.rhs
if isinstance(self.rhs, Col):
# Make sure the F Expression destination field exists, and
# set an `srid` attribute with the same as that of the
# destination.
geo_fld = self.rhs.output_field
if not hasattr(geo_fld, 'srid'):
raise ValueError('No geographic field found in expression.')
self.rhs.srid = geo_fld.srid
elif isinstance(self.rhs, Expression):
raise ValueError('Complex expressions not supported for spatial fields.')
elif isinstance(self.rhs, (list, tuple)):
geom = self.rhs[0]
# Check if a band index was passed in the query argument.
if ((len(self.rhs) == 2 and not self.lookup_name == 'relate') or
(len(self.rhs) == 3 and self.lookup_name == 'relate')):
self.process_band_indices()
elif len(self.rhs) > 2:
raise ValueError('Tuple too long for lookup %s.' % self.lookup_name)
elif isinstance(self.lhs, RasterBandTransform):
self.process_band_indices(only_lhs=True)
rhs, rhs_params = super(GISLookup, self).process_rhs(compiler, connection)
rhs = connection.ops.get_geom_placeholder(self.lhs.output_field, geom, compiler)
return rhs, rhs_params
def get_prep_value(self, value):
"""
Spatial lookup values are either a parameter that is (or may be
converted to) a geometry, or a sequence of lookup values that
begins with a geometry. This routine will setup the geometry
value properly, and preserve any other lookup parameters before
returning to the caller.
"""
value = super(GeometryField, self).get_prep_value(value)
if isinstance(value, Expression):
return value
elif isinstance(value, (tuple, list)):
geom = value[0]
seq_value = True
else:
geom = value
seq_value = False
# When the input is not a GEOS geometry, attempt to construct one
# from the given string input.
if isinstance(geom, Geometry):
pass
elif isinstance(geom, (bytes, six.string_types)) or hasattr(geom, '__geo_interface__'):
try:
geom = Geometry(geom)
except GeometryException:
raise ValueError('Could not create geometry from lookup value.')
else:
raise ValueError('Cannot use object with type %s for a geometry lookup parameter.' % type(geom).__name__)
# Assigning the SRID value.
geom.srid = self.get_srid(geom)
if seq_value:
lookup_val = [geom]
lookup_val.extend(value[1:])
return tuple(lookup_val)
else:
return geom
def get_db_prep_lookup(self, lookup_type, value, connection, prepared=False):
"""
Prepare for the database lookup, and return any spatial parameters
necessary for the query. This includes wrapping any geometry
parameters with a backend-specific adapter and formatting any distance
parameters into the correct units for the coordinate system of the
field.
"""
# special case for isnull lookup
if lookup_type == 'isnull':
return []
elif lookup_type in self.class_lookups:
# Populating the parameters list, and wrapping the Geometry
# with the Adapter of the spatial backend.
if isinstance(value, (tuple, list)):
params = [connection.ops.Adapter(value[0])]
if self.class_lookups[lookup_type].distance:
# Getting the distance parameter in the units of the field.
params += self.get_distance(value[1:], lookup_type, connection)
elif lookup_type in connection.ops.truncate_params:
# Lookup is one where SQL parameters aren't needed from the
# given lookup value.
pass
else:
params += value[1:]
elif isinstance(value, Expression):
params = []
else:
params = [connection.ops.Adapter(value)]
return params
else:
raise ValueError('%s is not a valid spatial lookup for %s.' %
(lookup_type, self.__class__.__name__))
def get_prep_value(self, value):
"""
Spatial lookup values are either a parameter that is (or may be
converted to) a geometry, or a sequence of lookup values that
begins with a geometry. This routine will setup the geometry
value properly, and preserve any other lookup parameters before
returning to the caller.
"""
value = super(GeometryField, self).get_prep_value(value)
if isinstance(value, Expression):
return value
elif isinstance(value, (tuple, list)):
geom = value[0]
seq_value = True
else:
geom = value
seq_value = False
# When the input is not a GEOS geometry, attempt to construct one
# from the given string input.
if isinstance(geom, Geometry):
pass
elif isinstance(geom, (bytes, six.string_types)) or hasattr(geom, '__geo_interface__'):
try:
geom = Geometry(geom)
except GeometryException:
raise ValueError('Could not create geometry from lookup value.')
else:
raise ValueError('Cannot use object with type %s for a geometry lookup parameter.' % type(geom).__name__)
# Assigning the SRID value.
geom.srid = self.get_srid(geom)
if seq_value:
lookup_val = [geom]
lookup_val.extend(value[1:])
return tuple(lookup_val)
else:
return geom
def get_db_prep_lookup(self, lookup_type, value, connection, prepared=False):
"""
Prepare for the database lookup, and return any spatial parameters
necessary for the query. This includes wrapping any geometry
parameters with a backend-specific adapter and formatting any distance
parameters into the correct units for the coordinate system of the
field.
"""
# special case for isnull lookup
if lookup_type == 'isnull':
return []
elif lookup_type in self.class_lookups:
# Populating the parameters list, and wrapping the Geometry
# with the Adapter of the spatial backend.
if isinstance(value, (tuple, list)):
params = [connection.ops.Adapter(value[0])]
if self.class_lookups[lookup_type].distance:
# Getting the distance parameter in the units of the field.
params += self.get_distance(value[1:], lookup_type, connection)
elif lookup_type in connection.ops.truncate_params:
# Lookup is one where SQL parameters aren't needed from the
# given lookup value.
pass
else:
params += value[1:]
elif isinstance(value, Expression):
params = []
else:
params = [connection.ops.Adapter(value)]
return params
else:
raise ValueError('%s is not a valid spatial lookup for %s.' %
(lookup_type, self.__class__.__name__))
def get_prep_value(self, value):
"""
Spatial lookup values are either a parameter that is (or may be
converted to) a geometry, or a sequence of lookup values that
begins with a geometry. This routine will setup the geometry
value properly, and preserve any other lookup parameters before
returning to the caller.
"""
value = super(GeometryField, self).get_prep_value(value)
if isinstance(value, Expression):
return value
elif isinstance(value, (tuple, list)):
geom = value[0]
seq_value = True
else:
geom = value
seq_value = False
# When the input is not a GEOS geometry, attempt to construct one
# from the given string input.
if isinstance(geom, Geometry):
pass
elif isinstance(geom, (bytes, six.string_types)) or hasattr(geom, '__geo_interface__'):
try:
geom = Geometry(geom)
except GeometryException:
raise ValueError('Could not create geometry from lookup value.')
else:
raise ValueError('Cannot use object with type %s for a geometry lookup parameter.' % type(geom).__name__)
# Assigning the SRID value.
geom.srid = self.get_srid(geom)
if seq_value:
lookup_val = [geom]
lookup_val.extend(value[1:])
return tuple(lookup_val)
else:
return geom
def get_db_prep_lookup(self, lookup_type, value, connection, prepared=False):
"""
Prepare for the database lookup, and return any spatial parameters
necessary for the query. This includes wrapping any geometry
parameters with a backend-specific adapter and formatting any distance
parameters into the correct units for the coordinate system of the
field.
"""
# special case for isnull lookup
if lookup_type == 'isnull':
return []
elif lookup_type in self.class_lookups:
# Populating the parameters list, and wrapping the Geometry
# with the Adapter of the spatial backend.
if isinstance(value, (tuple, list)):
params = [connection.ops.Adapter(value[0])]
if self.class_lookups[lookup_type].distance:
# Getting the distance parameter in the units of the field.
params += self.get_distance(value[1:], lookup_type, connection)
elif lookup_type in connection.ops.truncate_params:
# Lookup is one where SQL parameters aren't needed from the
# given lookup value.
pass
else:
params += value[1:]
elif isinstance(value, Expression):
params = []
else:
params = [connection.ops.Adapter(value)]
return params
else:
raise ValueError('%s is not a valid spatial lookup for %s.' %
(lookup_type, self.__class__.__name__))
def get_prep_value(self, value):
"""
Spatial lookup values are either a parameter that is (or may be
converted to) a geometry, or a sequence of lookup values that
begins with a geometry. This routine will setup the geometry
value properly, and preserve any other lookup parameters before
returning to the caller.
"""
value = super(GeometryField, self).get_prep_value(value)
if isinstance(value, Expression):
return value
elif isinstance(value, (tuple, list)):
geom = value[0]
seq_value = True
else:
geom = value
seq_value = False
# When the input is not a GEOS geometry, attempt to construct one
# from the given string input.
if isinstance(geom, Geometry):
pass
elif isinstance(geom, (bytes, six.string_types)) or hasattr(geom, '__geo_interface__'):
try:
geom = Geometry(geom)
except GeometryException:
raise ValueError('Could not create geometry from lookup value.')
else:
raise ValueError('Cannot use object with type %s for a geometry lookup parameter.' % type(geom).__name__)
# Assigning the SRID value.
geom.srid = self.get_srid(geom)
if seq_value:
lookup_val = [geom]
lookup_val.extend(value[1:])
return tuple(lookup_val)
else:
return geom
def get_db_prep_lookup(self, lookup_type, value, connection, prepared=False):
"""
Prepare for the database lookup, and return any spatial parameters
necessary for the query. This includes wrapping any geometry
parameters with a backend-specific adapter and formatting any distance
parameters into the correct units for the coordinate system of the
field.
"""
# special case for isnull lookup
if lookup_type == 'isnull':
return []
elif lookup_type in self.class_lookups:
# Populating the parameters list, and wrapping the Geometry
# with the Adapter of the spatial backend.
if isinstance(value, (tuple, list)):
params = [connection.ops.Adapter(value[0])]
if self.class_lookups[lookup_type].distance:
# Getting the distance parameter in the units of the field.
params += self.get_distance(value[1:], lookup_type, connection)
elif lookup_type in connection.ops.truncate_params:
# Lookup is one where SQL parameters aren't needed from the
# given lookup value.
pass
else:
params += value[1:]
elif isinstance(value, Expression):
params = []
else:
params = [connection.ops.Adapter(value)]
return params
else:
raise ValueError('%s is not a valid spatial lookup for %s.' %
(lookup_type, self.__class__.__name__))
def get_prep_value(self, value):
"""
Spatial lookup values are either a parameter that is (or may be
converted to) a geometry, or a sequence of lookup values that
begins with a geometry. This routine will setup the geometry
value properly, and preserve any other lookup parameters before
returning to the caller.
"""
value = super(GeometryField, self).get_prep_value(value)
if isinstance(value, Expression):
return value
elif isinstance(value, (tuple, list)):
geom = value[0]
seq_value = True
else:
geom = value
seq_value = False
# When the input is not a GEOS geometry, attempt to construct one
# from the given string input.
if isinstance(geom, Geometry):
pass
elif isinstance(geom, (bytes, six.string_types)) or hasattr(geom, '__geo_interface__'):
try:
geom = Geometry(geom)
except GeometryException:
raise ValueError('Could not create geometry from lookup value.')
else:
raise ValueError('Cannot use object with type %s for a geometry lookup parameter.' % type(geom).__name__)
# Assigning the SRID value.
geom.srid = self.get_srid(geom)
if seq_value:
lookup_val = [geom]
lookup_val.extend(value[1:])
return tuple(lookup_val)
else:
return geom
def get_db_prep_lookup(self, lookup_type, value, connection, prepared=False):
"""
Prepare for the database lookup, and return any spatial parameters
necessary for the query. This includes wrapping any geometry
parameters with a backend-specific adapter and formatting any distance
parameters into the correct units for the coordinate system of the
field.
"""
# special case for isnull lookup
if lookup_type == 'isnull':
return []
elif lookup_type in self.class_lookups:
# Populating the parameters list, and wrapping the Geometry
# with the Adapter of the spatial backend.
if isinstance(value, (tuple, list)):
params = [connection.ops.Adapter(value[0])]
if self.class_lookups[lookup_type].distance:
# Getting the distance parameter in the units of the field.
params += self.get_distance(value[1:], lookup_type, connection)
elif lookup_type in connection.ops.truncate_params:
# Lookup is one where SQL parameters aren't needed from the
# given lookup value.
pass
else:
params += value[1:]
elif isinstance(value, Expression):
params = []
else:
params = [connection.ops.Adapter(value)]
return params
else:
raise ValueError('%s is not a valid spatial lookup for %s.' %
(lookup_type, self.__class__.__name__))