def year_lookup_bounds(self, connection, year):
output_field = self.lhs.lhs.output_field
if isinstance(output_field, DateTimeField):
bounds = connection.ops.year_lookup_bounds_for_datetime_field(year)
else:
bounds = connection.ops.year_lookup_bounds_for_date_field(year)
return bounds
python类DateTimeField()的实例源码
def __init__(self, output_field=None, **extra):
if output_field is None:
output_field = fields.DateTimeField()
super(Now, self).__init__(output_field=output_field, **extra)
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
if settings.USE_TZ:
assert not isinstance(field, fields.DateTimeField), (
"%r is a DateTimeField, not a DateField." % field.name
)
return copy
def __init__(self, lookup, lookup_type, tzinfo):
super(DateTime, self).__init__(output_field=fields.DateTimeField())
self.lookup = lookup
self.col = None
self.lookup_type = lookup_type
if tzinfo is None:
self.tzname = None
else:
self.tzname = timezone._get_timezone_name(tzinfo)
self.tzinfo = tzinfo
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateTimeField), (
"%r isn't a DateTimeField." % field.name
)
return copy
def korben_view(request, model):
"""View for Korben."""
data = request.data
try:
obj = model.objects.get(pk=data['id'])
for key, value in data.items():
setattr(obj, key, value)
except model.DoesNotExist:
obj = model(**data)
# create datetime objects for datetime fields
for field in obj._meta.fields:
if isinstance(field, DateTimeField):
try:
date_obj = parse_date(getattr(obj, field.name, None))
setattr(obj, field.name, date_obj)
except (ValueError, AttributeError):
if field.null:
pass
else:
return Response(data=data, status=HTTP_400_BAD_REQUEST)
obj.save(as_korben=True) # data comes from Korben, kill validation
return Response(data={'message': 'OK'})
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
if settings.USE_TZ:
assert not isinstance(field, fields.DateTimeField), (
"%r is a DateTimeField, not a DateField." % field.name
)
return copy
def __init__(self, lookup, lookup_type, tzinfo):
super(DateTime, self).__init__(output_field=fields.DateTimeField())
self.lookup = lookup
self.col = None
self.lookup_type = lookup_type
if tzinfo is None:
self.tzname = None
else:
self.tzname = timezone._get_timezone_name(tzinfo)
self.tzinfo = tzinfo
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateTimeField), (
"%r isn't a DateTimeField." % field.name
)
return copy
def __init__(self, output_field=None, **extra):
if output_field is None:
output_field = fields.DateTimeField()
super(Now, self).__init__(output_field=output_field, **extra)
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
try:
output_field = expression.input_field.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev and Variance aggregations '
'on date/time fields in sqlite3 '
'since date/time is saved as text.')
except FieldError:
# not every sub-expression has an output_field which is fine to
# ignore
pass
def get_db_converters(self, expression):
converters = super(DatabaseOperations, self).get_db_converters(expression)
internal_type = expression.output_field.get_internal_type()
if internal_type == 'DateTimeField':
converters.append(self.convert_datetimefield_value)
elif internal_type == 'DateField':
converters.append(self.convert_datefield_value)
elif internal_type == 'TimeField':
converters.append(self.convert_timefield_value)
elif internal_type == 'DecimalField':
converters.append(self.convert_decimalfield_value)
elif internal_type == 'UUIDField':
converters.append(self.convert_uuidfield_value)
return converters
def _prepare_data(self, data):
"""
Prepare data for addition to the tree. If the data is a list or tuple,
it is expected to be of the form (obj, lookup_type, value), where obj
is a Constraint object, and is then slightly munged before being
stored (to avoid storing any reference to field objects). Otherwise,
the 'data' is stored unchanged and can be any class with an 'as_sql()'
method.
"""
if not isinstance(data, (list, tuple)):
return data
obj, lookup_type, value = data
if isinstance(value, collections.Iterator):
# Consume any generators immediately, so that we can determine
# emptiness and transform any non-empty values correctly.
value = list(value)
# The "value_annotation" parameter is used to pass auxiliary information
# about the value(s) to the query construction. Specifically, datetime
# and empty values need special handling. Other types could be used
# here in the future (using Python types is suggested for consistency).
if (isinstance(value, datetime.datetime)
or (isinstance(obj.field, DateTimeField) and lookup_type != 'isnull')):
value_annotation = datetime.datetime
elif hasattr(value, 'value_annotation'):
value_annotation = value.value_annotation
else:
value_annotation = bool(value)
if hasattr(obj, 'prepare'):
value = obj.prepare(lookup_type, value)
return (obj, lookup_type, value_annotation, value)
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
copy = self.copy()
copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
field = copy.col.output_field
assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
if settings.USE_TZ:
assert not isinstance(field, fields.DateTimeField), (
"%r is a DateTimeField, not a DateField." % field.name
)
return copy
def __init__(self, lookup, lookup_type, tzinfo):
super(DateTime, self).__init__(output_field=fields.DateTimeField())
self.lookup = lookup
self.col = None
self.lookup_type = lookup_type
if tzinfo is None:
self.tzname = None
else:
self.tzname = timezone._get_timezone_name(tzinfo)
self.tzinfo = tzinfo
def export_selected_data(self,request,queryset):
ops = self.model._meta
workbook = xlwt.Workbook(encoding='utf-8')
dd = datetime.date.today().strftime('%Y%m%d')
file_name = force_text(ops.verbose_name+dd)
sheet = workbook.add_sheet(force_text(ops.verbose_name))
obj_fields = getattr(self,'export_fields',None) or self.list_display or self.fields
head_col_index = 0
for field in obj_fields:
col_name = field
try:
f = ops.get_field(field)
col_name = f.verbose_name
except Exception,e:
f = getattr(self.model,field)
if hasattr(f,'short_description'):
col_name = f.short_description
sheet.write(0,head_col_index,force_text(col_name))
head_col_index+=1
row_index = 1
for obj in queryset:
col_index = 0
for field in obj_fields:
f = field
try:
f = ops.get_field(field)
except Exception,e:
pass
v = getattr(obj,field,'')
if hasattr(v,'__call__') or callable(v):
v = v()
elif type(f) == fields.DateField:
v = v.strftime('%Y-%m-%d')
elif type(f) == fields.DateTimeField:
v = v.strftime('%Y-%m-%d %H:%M')
elif type(f) == fields.CharField and f.choices:
fc = 'get_'+field+'_display'
v = getattr(obj,fc)()
elif type(f) == related.ForeignKey:
v = str(v)
sheet.write(row_index,col_index,v)
col_index += 1
row_index += 1
response = HttpResponse(content_type='application/vnd.ms-excel')
agent = request.META.get('HTTP_USER_AGENT')
nn = smart_str(file_name)
if agent and re.search('MSIE',agent):
nn = urlquote(file_name)
response['Content-Disposition'] = 'attachment; filename=%s.xls'%nn
workbook.save(response)
return response
#self.message_user(request,'SUCCESS')
def update_local_issue(
gr_issue,
id_namespace='',
service_namespace='',
):
"""
:param gr_issue: GeoReportv2 Issue structure (as a dict)
:param id_namespace: String to prepend to request identifiers
:param service_namespace: String to prepend to service codes
:return: The created/updated Issue and a `created` flag
"""
gr_issue = deepcopy(gr_issue)
identifier = gr_issue.pop('service_request_id')
if id_namespace:
identifier = '%s:%s' % (id_namespace, identifier)
issue = Issue.objects.filter(identifier=identifier).first()
if not issue:
issue = Issue(identifier=identifier)
created = True
else:
created = False
for field in Issue._meta.get_fields():
if field.name in gr_issue:
value = gr_issue.pop(field.name)
if isinstance(field, DateTimeField):
value = parse_date(value)
setattr(issue, field.attname, value)
if "long" in gr_issue and "lat" in gr_issue:
issue.location = GEOSGeometry(
'SRID=4326;POINT(%s %s)' % (gr_issue.pop('long'), gr_issue.pop('lat'))
)
if 'service_code' in gr_issue:
gr_issue['service_code'] = '%s%s' % (service_namespace, gr_issue['service_code'])
# This has no direct mapping in our schema, but it can be used by implicit autocreation of services
issue.service_name = gr_issue.pop('service_name', None)
issue._cache_data()
issue.full_clean()
issue.save()
extended_attributes = gr_issue.pop('extended_attributes', {})
for ex_class in get_extensions():
ex = ex_class()
ex.parse_extended_attributes(issue, extended_attributes)
if gr_issue:
print(gr_issue)
issue.source = gr_issue
return (issue, created)
def handle(self, *args, **options):
"""
Update periodic update events and send notifications for upcoming update events.
"""
# command to run: python manage.py tunga_manage_task_status
# Choose tasks that aren't closed or under review already
tasks_filter = Task.objects.filter(
scope=TASK_SCOPE_TASK, closed=False, review=False
).annotate(
activated_at=Case(
When(
approved_at__isnull=True,
then='created_at'
),
default='approved_at',
output_field=DateTimeField()
)
)
utc_now = datetime.datetime.utcnow()
# Remind admins and devs about approved tasks with no applications 2 days after creation or approval
min_date_no_applications = utc_now - relativedelta(days=2)
min_date_no_developer_selected = utc_now - relativedelta(days=10)
tasks_no_applications = tasks_filter.filter(
approved=True, participants__isnull=False, activated_at__range=[
min_date_no_developer_selected, min_date_no_applications
]
)
for task in tasks_no_applications:
# Remind admins
remind_no_task_applications.delay(task.id, admin=True)
# Remind devs
remind_no_task_applications.delay(task.id, admin=False)
# Remind admins to take action on tasks with no accepted applications 10 days after creation or approval
tasks_no_developers_selected = tasks_filter.filter(
participants__isnull=True, created_at__lte=min_date_no_developer_selected
)
for task in tasks_no_developers_selected:
# Put task in review
task.review = True
task.save()
# Notify admins to take action
notify_review_task_admin.delay(task.id)