def to_dict(self):
d = model_to_dict(self)
tastyjson = self._serializer.to_json(d)
d = self._serializer.from_json(tastyjson)
d[DOC_TYPE_FIELD_NAME] = self.get_doc_type()
d['id'] = self.get_id()
if 'cbnosync_ptr' in d: del d['cbnosync_ptr']
if 'csrfmiddlewaretoken' in d: del d['csrfmiddlewaretoken']
for field in self._meta.fields:
if isinstance(field, DateTimeField):
d[field.name] = self._string_from_date(field.name)
if isinstance(field, ListField):
if isinstance(field.item_field, EmbeddedModelField):
self.to_dict_nested_list(field.name, d)
if isinstance(field.item_field, ModelReferenceField):
self.to_dict_reference_list(field.name, d)
if isinstance(field, EmbeddedModelField):
self.to_dict_nested(field.name, d)
if isinstance(field, ModelReferenceField):
self.to_dict_reference(field.name, d)
return d
python类DateTimeField()的实例源码
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def from_dict(self, dict_payload):
for field in self._meta.fields:
if field.name not in dict_payload:
continue
if isinstance(field, EmbeddedModelField):
self.from_dict_nested(field.name, field.embedded_model, dict_payload)
continue
if isinstance(field, ListField):
if isinstance(field.item_field, EmbeddedModelField):
self.from_dict_nested_list(field.name, field.item_field.embedded_model, dict_payload)
continue
if isinstance(field, DateTimeField):
self._date_from_string(field.name, dict_payload.get(field.name))
elif isinstance(field, DecimalField):
self._decimal_from_string(field.name, dict_payload.get(field.name))
elif field.name in dict_payload:
setattr(self, field.name, dict_payload[field.name])
if 'id' in dict_payload.keys():
self.id = dict_payload['id']
def to_dict(self):
d = model_to_dict(self)
tastyjson = self._serializer.to_json(d)
d = self._serializer.from_json(tastyjson)
d[DOC_TYPE_FIELD_NAME] = self.get_doc_type()
d['id'] = self.get_id()
if 'cbnosync_ptr' in d: del d['cbnosync_ptr']
if 'csrfmiddlewaretoken' in d: del d['csrfmiddlewaretoken']
for field in self._meta.fields:
if isinstance(field, DateTimeField):
d[field.name] = self._string_from_date(field.name)
if isinstance(field, ListField):
if isinstance(field.item_field, EmbeddedModelField):
self.to_dict_nested_list(field.name, d)
if isinstance(field.item_field, ModelReferenceField):
self.to_dict_reference_list(field.name, d)
if isinstance(field, EmbeddedModelField):
self.to_dict_nested(field.name, d)
if isinstance(field, ModelReferenceField):
self.to_dict_reference(field.name, d)
return d
def from_dict(self, dict_payload):
for field in self._meta.fields:
if field.name not in dict_payload:
continue
if isinstance(field, EmbeddedModelField):
self.from_dict_nested(field.name, field.embedded_model, dict_payload)
continue
if isinstance(field, ListField):
if isinstance(field.item_field, EmbeddedModelField):
self.from_dict_nested_list(field.name, field.item_field.embedded_model, dict_payload)
continue
if isinstance(field, DateTimeField):
self._date_from_string(field.name, dict_payload.get(field.name))
elif isinstance(field, DecimalField):
self._decimal_from_string(field.name, dict_payload.get(field.name))
elif field.name in dict_payload:
setattr(self, field.name, dict_payload[field.name])
if 'id' in dict_payload.keys():
self.id = dict_payload['id']
def get_django_field_map(self):
from django.db.models import fields as djf
return [
(djf.AutoField, PrimaryKeyField),
(djf.BigIntegerField, BigIntegerField),
# (djf.BinaryField, BlobField),
(djf.BooleanField, BooleanField),
(djf.CharField, CharField),
(djf.DateTimeField, DateTimeField), # Extends DateField.
(djf.DateField, DateField),
(djf.DecimalField, DecimalField),
(djf.FilePathField, CharField),
(djf.FloatField, FloatField),
(djf.IntegerField, IntegerField),
(djf.NullBooleanField, partial(BooleanField, null=True)),
(djf.TextField, TextField),
(djf.TimeField, TimeField),
(djf.related.ForeignKey, ForeignKeyField),
]
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def get_django_field_map(self):
from django.db.models import fields as djf
return [
(djf.AutoField, PrimaryKeyField),
(djf.BigIntegerField, BigIntegerField),
# (djf.BinaryField, BlobField),
(djf.BooleanField, BooleanField),
(djf.CharField, CharField),
(djf.DateTimeField, DateTimeField), # Extends DateField.
(djf.DateField, DateField),
(djf.DecimalField, DecimalField),
(djf.FilePathField, CharField),
(djf.FloatField, FloatField),
(djf.IntegerField, IntegerField),
(djf.NullBooleanField, partial(BooleanField, null=True)),
(djf.TextField, TextField),
(djf.TimeField, TimeField),
(djf.related.ForeignKey, ForeignKeyField),
]
def get_django_field_map(self):
from django.db.models import fields as djf
return [
(djf.AutoField, PrimaryKeyField),
(djf.BigIntegerField, BigIntegerField),
# (djf.BinaryField, BlobField),
(djf.BooleanField, BooleanField),
(djf.CharField, CharField),
(djf.DateTimeField, DateTimeField), # Extends DateField.
(djf.DateField, DateField),
(djf.DecimalField, DecimalField),
(djf.FilePathField, CharField),
(djf.FloatField, FloatField),
(djf.IntegerField, IntegerField),
(djf.NullBooleanField, partial(BooleanField, null=True)),
(djf.TextField, TextField),
(djf.TimeField, TimeField),
(djf.related.ForeignKey, ForeignKeyField),
]
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
elif internal_type in ('NullBooleanField', 'BooleanField'):
converters.append(self.convert_booleanfield_value)
return converters
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def results_iter(self):
if self.connection.ops.oracle:
from django.db.models.fields import DateTimeField
fields = [DateTimeField()]
else:
needs_string_cast = self.connection.features.needs_datetime_string_cast
offset = len(self.query.extra_select)
for rows in self.execute_sql(MULTI):
for row in rows:
date = row[offset]
if self.connection.ops.oracle:
date = self.resolve_columns(row, fields)[offset]
elif needs_string_cast:
date = typecast_date(str(date))
if isinstance(date, datetime.datetime):
date = date.date()
yield date
def results_iter(self):
if self.connection.ops.oracle:
from django.db.models.fields import DateTimeField
fields = [DateTimeField()]
else:
needs_string_cast = self.connection.features.needs_datetime_string_cast
offset = len(self.query.extra_select)
for rows in self.execute_sql(MULTI):
for row in rows:
datetime = row[offset]
if self.connection.ops.oracle:
datetime = self.resolve_columns(row, fields)[offset]
elif needs_string_cast:
datetime = typecast_timestamp(str(datetime))
# Datetimes are artifically returned in UTC on databases that
# don't support time zone. Restore the zone used in the query.
if settings.USE_TZ:
datetime = datetime.replace(tzinfo=None)
datetime = timezone.make_aware(datetime, self.query.tzinfo)
yield datetime
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
elif internal_type in ('NullBooleanField', 'BooleanField'):
converters.append(self.convert_booleanfield_value)
return converters
def annotate_channel_queryset_with_latest_activity_at(queryset, user):
return queryset.annotate(
latest_activity_timestamp=Max('action_targets__timestamp'),
).annotate(
latest_activity_at=Case(
When(
latest_activity_timestamp__isnull=True,
then='created_at'
),
When(
latest_activity_timestamp__gt=F('created_at'),
then='latest_activity_timestamp'
),
default='created_at',
output_field=DateTimeField()
)
)
_django_db_models_base.py 文件源码
项目:Tinychat-Bot--Discontinued
作者: Tinychat
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def _encodeValue(self, field, value):
if value is fields.NOT_PROVIDED:
return pyamf.Undefined
if value is None:
return value
# deal with dates ..
if isinstance(field, fields.DateTimeField):
return value
elif isinstance(field, fields.DateField):
return datetime.datetime(
value.year,
value.month,
value.day,
0, # hour
0, # minute
0, # second
)
elif isinstance(field, fields.TimeField):
return datetime.datetime(
1970, # year
1, # month
1, # day
value.hour,
value.minute,
value.second,
value.microsecond
)
elif isinstance(value, files.FieldFile):
return value.name
return value
_django_db_models_base.py 文件源码
项目:Tinychat-Bot--Discontinued
作者: Tinychat
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def _decodeValue(self, field, value):
if value is pyamf.Undefined:
return fields.NOT_PROVIDED
if isinstance(field, fields.AutoField) and value == 0:
return None
elif isinstance(field, fields.DateTimeField):
# deal with dates
return value
elif isinstance(field, fields.DateField):
if not value:
return None
return datetime.date(value.year, value.month, value.day)
elif isinstance(field, fields.TimeField):
if not value:
return None
return datetime.time(
value.hour,
value.minute,
value.second,
value.microsecond,
)
return value
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
if settings.USE_TZ:
assert not isinstance(field, fields.DateTimeField), (
"%r is a DateTimeField, not a DateField." % field.name
)
return copy
def __init__(self, lookup, lookup_type, tzinfo):
super(DateTime, self).__init__(output_field=fields.DateTimeField())
self.lookup = lookup
self.col = None
self.lookup_type = lookup_type
if tzinfo is None:
self.tzname = None
else:
self.tzname = timezone._get_timezone_name(tzinfo)
self.tzinfo = tzinfo
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateTimeField), (
"%r isn't a DateTimeField." % field.name
)
return copy
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
try:
output_field = expression.input_field.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev and Variance aggregations '
'on date/time fields in sqlite3 '
'since date/time is saved as text.')
except FieldError:
# not every sub-expression has an output_field which is fine to
# ignore
pass
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
try:
output_field = expression.input_field.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev and Variance aggregations '
'on date/time fields in sqlite3 '
'since date/time is saved as text.')
except FieldError:
# not every sub-expression has an output_field which is fine to
# ignore
pass
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def __init__(self, output_field=None, **extra):
if output_field is None:
output_field = fields.DateTimeField()
super(Now, self).__init__(output_field=output_field, **extra)