def from_current_timezone(value):
"""
When time zone support is enabled, convert naive datetimes
entered in the current time zone to aware datetimes.
"""
if settings.USE_TZ and value is not None and timezone.is_naive(value):
current_timezone = timezone.get_current_timezone()
try:
return timezone.make_aware(value, current_timezone)
except Exception:
message = _(
'%(datetime)s couldn\'t be interpreted '
'in time zone %(current_timezone)s; it '
'may be ambiguous or it may not exist.'
)
params = {'datetime': value, 'current_timezone': current_timezone}
six.reraise(ValidationError, ValidationError(
message,
code='ambiguous_timezone',
params=params,
), sys.exc_info()[2])
return value
python类USE_TZ的实例源码
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
connection = connections[db]
table = connection.ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connection.cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connection.ops.adapt_datetimefield_value(now)])
return cursor.fetchone() is not None
def from_current_timezone(value):
"""
When time zone support is enabled, convert naive datetimes
entered in the current time zone to aware datetimes.
"""
if settings.USE_TZ and value is not None and timezone.is_naive(value):
current_timezone = timezone.get_current_timezone()
try:
return timezone.make_aware(value, current_timezone)
except Exception:
message = _(
'%(datetime)s couldn\'t be interpreted '
'in time zone %(current_timezone)s; it '
'may be ambiguous or it may not exist.'
)
params = {'datetime': value, 'current_timezone': current_timezone}
six.reraise(ValidationError, ValidationError(
message,
code='ambiguous_timezone',
params=params,
), sys.exc_info()[2])
return value
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
table = connections[db].ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connections[db].cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connections[db].ops.value_to_db_datetime(now)])
return cursor.fetchone() is not None
def _cull(self, db, cursor, now):
if self._cull_frequency == 0:
self.clear()
else:
# When USE_TZ is True, 'now' will be an aware datetime in UTC.
now = now.replace(tzinfo=None)
table = connections[db].ops.quote_name(self._table)
cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
[connections[db].ops.value_to_db_datetime(now)])
cursor.execute("SELECT COUNT(*) FROM %s" % table)
num = cursor.fetchone()[0]
if num > self._max_entries:
cull_num = num // self._cull_frequency
cursor.execute(
connections[db].ops.cache_key_culling_sql() % table,
[cull_num])
cursor.execute("DELETE FROM %s "
"WHERE cache_key < %%s" % table,
[cursor.fetchone()[0]])
def datetime_trunc_sql(self, lookup_type, field_name, tzname):
if settings.USE_TZ:
field_name = "CONVERT_TZ(%s, 'UTC', %%s)" % field_name
params = [tzname]
else:
params = []
fields = ['year', 'month', 'day', 'hour', 'minute', 'second']
format = ('%%Y-', '%%m', '-%%d', ' %%H:', '%%i', ':%%s') # Use double percents to escape.
format_def = ('0000-', '01', '-01', ' 00:', '00', ':00')
try:
i = fields.index(lookup_type) + 1
except ValueError:
sql = field_name
else:
format_str = ''.join([f for f in format[:i]] + [f for f in format_def[i:]])
sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str)
return sql, params
def adapt_datetime_with_timezone_support(value, conv):
# Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL.
if settings.USE_TZ:
if timezone.is_naive(value):
warnings.warn("MySQL received a naive datetime (%s)"
" while time zone support is active." % value,
RuntimeWarning)
default_timezone = timezone.get_default_timezone()
value = timezone.make_aware(value, default_timezone)
value = value.astimezone(timezone.utc).replace(tzinfo=None)
return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)
# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
# Finally, MySQLdb always returns naive datetime objects. However, when
# timezone support is active, Django expects timezone-aware datetime objects.
def value_to_db_datetime(self, value):
"""
Transform a datetime value to an object compatible with what is expected
by the backend driver for datetime columns.
If naive datetime is passed assumes that is in UTC. Normally Django
models.DateTimeField makes sure that if USE_TZ is True passed datetime
is timezone aware.
"""
if value is None:
return None
# cx_Oracle doesn't support tz-aware datetimes
if timezone.is_aware(value):
if settings.USE_TZ:
value = value.astimezone(timezone.utc).replace(tzinfo=None)
else:
raise ValueError("Oracle backend does not support timezone-aware datetimes when USE_TZ is False.")
return Oracle_datetime.from_datetime(value)
def from_current_timezone(value):
"""
When time zone support is enabled, convert naive datetimes
entered in the current time zone to aware datetimes.
"""
if settings.USE_TZ and value is not None and timezone.is_naive(value):
current_timezone = timezone.get_current_timezone()
try:
return timezone.make_aware(value, current_timezone)
except Exception:
message = _(
'%(datetime)s couldn\'t be interpreted '
'in time zone %(current_timezone)s; it '
'may be ambiguous or it may not exist.'
)
params = {'datetime': value, 'current_timezone': current_timezone}
six.reraise(ValidationError, ValidationError(
message,
code='ambiguous_timezone',
params=params,
), sys.exc_info()[2])
return value
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
connection = connections[db]
table = connection.ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connection.cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connection.ops.adapt_datetimefield_value(now)])
return cursor.fetchone() is not None
def from_current_timezone(value):
"""
When time zone support is enabled, convert naive datetimes
entered in the current time zone to aware datetimes.
"""
if settings.USE_TZ and value is not None and timezone.is_naive(value):
current_timezone = timezone.get_current_timezone()
try:
return timezone.make_aware(value, current_timezone)
except Exception:
message = _(
'%(datetime)s couldn\'t be interpreted '
'in time zone %(current_timezone)s; it '
'may be ambiguous or it may not exist.'
)
params = {'datetime': value, 'current_timezone': current_timezone}
six.reraise(ValidationError, ValidationError(
message,
code='ambiguous_timezone',
params=params,
), sys.exc_info()[2])
return value
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
connection = connections[db]
table = connection.ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connection.cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connection.ops.adapt_datetimefield_value(now)])
return cursor.fetchone() is not None
def from_current_timezone(value):
"""
When time zone support is enabled, convert naive datetimes
entered in the current time zone to aware datetimes.
"""
if settings.USE_TZ and value is not None and timezone.is_naive(value):
current_timezone = timezone.get_current_timezone()
try:
return timezone.make_aware(value, current_timezone)
except Exception:
message = _(
'%(datetime)s couldn\'t be interpreted '
'in time zone %(current_timezone)s; it '
'may be ambiguous or it may not exist.'
)
params = {'datetime': value, 'current_timezone': current_timezone}
six.reraise(ValidationError, ValidationError(
message,
code='ambiguous_timezone',
params=params,
), sys.exc_info()[2])
return value
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
table = connections[db].ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connections[db].cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connections[db].ops.value_to_db_datetime(now)])
return cursor.fetchone() is not None
def _cull(self, db, cursor, now):
if self._cull_frequency == 0:
self.clear()
else:
# When USE_TZ is True, 'now' will be an aware datetime in UTC.
now = now.replace(tzinfo=None)
table = connections[db].ops.quote_name(self._table)
cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
[connections[db].ops.value_to_db_datetime(now)])
cursor.execute("SELECT COUNT(*) FROM %s" % table)
num = cursor.fetchone()[0]
if num > self._max_entries:
cull_num = num // self._cull_frequency
cursor.execute(
connections[db].ops.cache_key_culling_sql() % table,
[cull_num])
cursor.execute("DELETE FROM %s "
"WHERE cache_key < %%s" % table,
[cursor.fetchone()[0]])
def datetime_trunc_sql(self, lookup_type, field_name, tzname):
if settings.USE_TZ:
field_name = "CONVERT_TZ(%s, 'UTC', %%s)" % field_name
params = [tzname]
else:
params = []
fields = ['year', 'month', 'day', 'hour', 'minute', 'second']
format = ('%%Y-', '%%m', '-%%d', ' %%H:', '%%i', ':%%s') # Use double percents to escape.
format_def = ('0000-', '01', '-01', ' 00:', '00', ':00')
try:
i = fields.index(lookup_type) + 1
except ValueError:
sql = field_name
else:
format_str = ''.join([f for f in format[:i]] + [f for f in format_def[i:]])
sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str)
return sql, params
def adapt_datetime_with_timezone_support(value, conv):
# Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL.
if settings.USE_TZ:
if timezone.is_naive(value):
warnings.warn("MySQL received a naive datetime (%s)"
" while time zone support is active." % value,
RuntimeWarning)
default_timezone = timezone.get_default_timezone()
value = timezone.make_aware(value, default_timezone)
value = value.astimezone(timezone.utc).replace(tzinfo=None)
return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)
# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
# Finally, MySQLdb always returns naive datetime objects. However, when
# timezone support is active, Django expects timezone-aware datetime objects.
def from_current_timezone(value):
"""
When time zone support is enabled, convert naive datetimes
entered in the current time zone to aware datetimes.
"""
if settings.USE_TZ and value is not None and timezone.is_naive(value):
current_timezone = timezone.get_current_timezone()
try:
return timezone.make_aware(value, current_timezone)
except Exception:
message = _(
'%(datetime)s couldn\'t be interpreted '
'in time zone %(current_timezone)s; it '
'may be ambiguous or it may not exist.'
)
params = {'datetime': value, 'current_timezone': current_timezone}
six.reraise(ValidationError, ValidationError(
message,
code='ambiguous_timezone',
params=params,
), sys.exc_info()[2])
return value
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
table = connections[db].ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connections[db].cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connections[db].ops.value_to_db_datetime(now)])
return cursor.fetchone() is not None
def _cull(self, db, cursor, now):
if self._cull_frequency == 0:
self.clear()
else:
# When USE_TZ is True, 'now' will be an aware datetime in UTC.
now = now.replace(tzinfo=None)
table = connections[db].ops.quote_name(self._table)
cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
[connections[db].ops.value_to_db_datetime(now)])
cursor.execute("SELECT COUNT(*) FROM %s" % table)
num = cursor.fetchone()[0]
if num > self._max_entries:
cull_num = num // self._cull_frequency
cursor.execute(
connections[db].ops.cache_key_culling_sql() % table,
[cull_num])
cursor.execute("DELETE FROM %s "
"WHERE cache_key < %%s" % table,
[cursor.fetchone()[0]])