def batch_process_rhs(self, compiler, connection, rhs=None):
if rhs is None:
rhs = self.rhs
if self.bilateral_transforms:
sqls, sqls_params = [], []
for p in rhs:
value = Value(p, output_field=self.lhs.output_field)
value = self.apply_bilateral_transforms(value)
value = value.resolve_expression(compiler.query)
sql, sql_params = compiler.compile(value)
sqls.append(sql)
sqls_params.extend(sql_params)
else:
_, params = self.get_db_prep_lookup(rhs, connection)
sqls, sqls_params = ['%s'] * len(params), params
return sqls, sqls_params
python类Value()的实例源码
def process_rhs(self, compiler, connection):
value = self.rhs
if self.bilateral_transforms:
if self.rhs_is_direct_value():
# Do not call get_db_prep_lookup here as the value will be
# transformed before being used for lookup
value = Value(value, output_field=self.lhs.output_field)
value = self.apply_bilateral_transforms(value)
value = value.resolve_expression(compiler.query)
# Due to historical reasons there are a couple of different
# ways to produce sql here. get_compiler is likely a Query
# instance and as_sql just something with as_sql. Finally the value
# can of course be just plain Python value.
if hasattr(value, 'get_compiler'):
value = value.get_compiler(connection=connection)
if hasattr(value, 'as_sql'):
sql, params = compiler.compile(value)
return '(' + sql + ')', params
else:
return self.get_db_prep_lookup(value, connection)
def as_postgresql(self, compiler, connection):
geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info
src_field = self.get_source_fields()[0]
geography = src_field.geography and self.srid == 4326
if geography:
# Set parameters as geography if base field is geography
for pos, expr in enumerate(
self.source_expressions[self.geom_param_pos + 1:], start=self.geom_param_pos + 1):
if isinstance(expr, GeomValue):
expr.geography = True
elif geo_field.geodetic(connection):
# Geometry fields with geodetic (lon/lat) coordinates need special distance functions
if self.spheroid:
self.function = 'ST_Distance_Spheroid' # More accurate, resource intensive
# Replace boolean param by the real spheroid of the base field
self.source_expressions[2] = Value(geo_field._spheroid)
else:
self.function = 'ST_Distance_Sphere'
return super(Distance, self).as_sql(compiler, connection)
def as_postgresql(self, compiler, connection):
geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info
src_field = self.get_source_fields()[0]
geography = src_field.geography and self.srid == 4326
if geography:
# Set parameters as geography if base field is geography
for pos, expr in enumerate(
self.source_expressions[self.geom_param_pos + 1:], start=self.geom_param_pos + 1):
if isinstance(expr, GeomValue):
expr.geography = True
elif geo_field.geodetic(connection):
# Geometry fields with geodetic (lon/lat) coordinates need special distance functions
if self.spheroid:
self.function = 'ST_Distance_Spheroid' # More accurate, resource intensive
# Replace boolean param by the real spheroid of the base field
self.source_expressions[2] = Value(geo_field._spheroid)
else:
self.function = 'ST_Distance_Sphere'
return super(Distance, self).as_sql(compiler, connection)
def __init__(self, expression, pos, length=None, **extra):
"""
expression: the name of a field, or an expression returning a string
pos: an integer > 0, or an expression returning an integer
length: an optional number of characters to return
"""
if not hasattr(pos, 'resolve_expression'):
if pos < 1:
raise ValueError("'pos' must be greater than 0")
pos = Value(pos)
expressions = [expression, pos]
if length is not None:
if not hasattr(length, 'resolve_expression'):
length = Value(length)
expressions.append(length)
super(Substr, self).__init__(*expressions, **extra)
def with_status_name(self):
case = Case(output_field=models.CharField())
for status_value in self.model.STATUS._db_values:
case.cases.append(
When(status=status_value, then=Value(str(self.model.STATUS[status_value]))),
)
return self.annotate(status_name=case)
def as_postgresql(self, compiler, connection):
geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info
src_field = self.get_source_fields()[0]
geography = src_field.geography and self.srid == 4326
if geography:
self.source_expressions.append(Value(self.spheroid))
elif geo_field.geodetic(connection):
# Geometry fields with geodetic (lon/lat) coordinates need length_spheroid
self.function = 'ST_Length_Spheroid'
self.source_expressions.append(Value(geo_field._spheroid))
else:
dim = min(f.dim for f in self.get_source_fields() if f)
if dim > 2:
self.function = connection.ops.length3d
return super(Length, self).as_sql(compiler, connection)
def as_sqlite(self, compiler, connection):
func_name = connection.ops.spatial_function_name(self.name)
if func_name == 'ST_Translate' and len(self.source_expressions) < 4:
# Always provide the z parameter for ST_Translate (Spatialite >= 3.1)
self.source_expressions.append(Value(0))
elif func_name == 'ShiftCoords' and len(self.source_expressions) > 3:
raise ValueError("This version of Spatialite doesn't support 3D")
return super(Translate, self).as_sqlite(compiler, connection)
def as_postgresql(self, compiler, connection):
geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info
if self.source_is_geography():
self.source_expressions.append(Value(self.spheroid))
elif geo_field.geodetic(connection):
# Geometry fields with geodetic (lon/lat) coordinates need length_spheroid
self.function = 'ST_Length_Spheroid'
self.source_expressions.append(Value(geo_field._spheroid))
else:
dim = min(f.dim for f in self.get_source_fields() if f)
if dim > 2:
self.function = connection.ops.length3d
return super(Length, self).as_sql(compiler, connection)
def as_sqlite(self, compiler, connection):
func_name = connection.ops.spatial_function_name(self.name)
if func_name == 'ST_Translate' and len(self.source_expressions) < 4:
# Always provide the z parameter for ST_Translate (Spatialite >= 3.1)
self.source_expressions.append(Value(0))
elif func_name == 'ShiftCoords' and len(self.source_expressions) > 3:
raise ValueError("This version of Spatialite doesn't support 3D")
return super(Translate, self).as_sqlite(compiler, connection)
def __init__(self, *expressions, **extra):
super(SearchVector, self).__init__(*expressions, **extra)
self.source_expressions = [
Coalesce(expression, Value('')) for expression in self.source_expressions
]
self.config = self.extra.get('config', self.config)
weight = self.extra.get('weight')
if weight is not None and not hasattr(weight, 'resolve_expression'):
weight = Value(weight)
self.weight = weight
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
resolved = super(SearchVector, self).resolve_expression(query, allow_joins, reuse, summarize, for_save)
if self.config:
if not hasattr(self.config, 'resolve_expression'):
resolved.config = Value(self.config).resolve_expression(query, allow_joins, reuse, summarize, for_save)
else:
resolved.config = self.config.resolve_expression(query, allow_joins, reuse, summarize, for_save)
return resolved
def __init__(self, vector, query, **extra):
if not hasattr(vector, 'resolve_expression'):
vector = SearchVector(vector)
if not hasattr(query, 'resolve_expression'):
query = SearchQuery(query)
weights = extra.get('weights')
if weights is not None and not hasattr(weights, 'resolve_expression'):
weights = Value(weights)
self.weights = weights
super(SearchRank, self).__init__(vector, query, **extra)
def __init__(self, expression, string, **extra):
if not hasattr(string, 'resolve_expression'):
string = Value(string)
super(TrigramBase, self).__init__(expression, string, output_field=FloatField(), **extra)
def as_postgresql(self, compiler, connection):
geo_field = GeometryField(srid=self.srid) # Fake field to get SRID info
if self.source_is_geography():
self.source_expressions.append(Value(self.spheroid))
elif geo_field.geodetic(connection):
# Geometry fields with geodetic (lon/lat) coordinates need length_spheroid
self.function = connection.ops.spatial_function_name('LengthSpheroid')
self.source_expressions.append(Value(geo_field._spheroid))
else:
dim = min(f.dim for f in self.get_source_fields() if f)
if dim > 2:
self.function = connection.ops.length3d
return super(Length, self).as_sql(compiler, connection)
def as_sqlite(self, compiler, connection):
if len(self.source_expressions) < 4:
# Always provide the z parameter for ST_Translate
self.source_expressions.append(Value(0))
return super(Translate, self).as_sqlite(compiler, connection)
def __init__(self, *expressions, **extra):
super(SearchVector, self).__init__(*expressions, **extra)
self.source_expressions = [
Coalesce(expression, Value('')) for expression in self.source_expressions
]
self.config = self.extra.get('config', self.config)
weight = self.extra.get('weight')
if weight is not None and not hasattr(weight, 'resolve_expression'):
weight = Value(weight)
self.weight = weight
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
resolved = super(SearchVector, self).resolve_expression(query, allow_joins, reuse, summarize, for_save)
if self.config:
if not hasattr(self.config, 'resolve_expression'):
resolved.config = Value(self.config).resolve_expression(query, allow_joins, reuse, summarize, for_save)
else:
resolved.config = self.config.resolve_expression(query, allow_joins, reuse, summarize, for_save)
return resolved
def __init__(self, vector, query, **extra):
if not hasattr(vector, 'resolve_expression'):
vector = SearchVector(vector)
if not hasattr(query, 'resolve_expression'):
query = SearchQuery(query)
weights = extra.get('weights')
if weights is not None and not hasattr(weights, 'resolve_expression'):
weights = Value(weights)
self.weights = weights
super(SearchRank, self).__init__(vector, query, **extra)
def __init__(self, expression, string, **extra):
if not hasattr(string, 'resolve_expression'):
string = Value(string)
super(TrigramBase, self).__init__(expression, string, output_field=FloatField(), **extra)