python类DateTimeField()的实例源码

lookups.py 文件源码 项目:lifesoundtrack 作者: MTG 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def year_lookup_bounds(self, connection, year):
        output_field = self.lhs.lhs.output_field
        if isinstance(output_field, DateTimeField):
            bounds = connection.ops.year_lookup_bounds_for_datetime_field(year)
        else:
            bounds = connection.ops.year_lookup_bounds_for_date_field(year)
        return bounds
base.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, output_field=None, **extra):
        if output_field is None:
            output_field = fields.DateTimeField()
        super(Now, self).__init__(output_field=output_field, **extra)
operations.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_db_converters(self, expression):
        converters = super(DatabaseOperations, self).get_db_converters(expression)
        internal_type = expression.output_field.get_internal_type()
        if internal_type == 'DateTimeField':
            converters.append(self.convert_datetimefield_value)
        elif internal_type == 'DateField':
            converters.append(self.convert_datefield_value)
        elif internal_type == 'TimeField':
            converters.append(self.convert_timefield_value)
        elif internal_type == 'DecimalField':
            converters.append(self.convert_decimalfield_value)
        elif internal_type == 'UUIDField':
            converters.append(self.convert_uuidfield_value)
        return converters
expressions.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
        copy = self.copy()
        copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
        field = copy.col.output_field
        assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
        if settings.USE_TZ:
            assert not isinstance(field, fields.DateTimeField), (
                "%r is a DateTimeField, not a DateField." % field.name
            )
        return copy
expressions.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, lookup, lookup_type, tzinfo):
        super(DateTime, self).__init__(output_field=fields.DateTimeField())
        self.lookup = lookup
        self.col = None
        self.lookup_type = lookup_type
        if tzinfo is None:
            self.tzname = None
        else:
            self.tzname = timezone._get_timezone_name(tzinfo)
        self.tzinfo = tzinfo
expressions.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
        copy = self.copy()
        copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
        field = copy.col.output_field
        assert isinstance(field, fields.DateTimeField), (
            "%r isn't a DateTimeField." % field.name
        )
        return copy
views.py 文件源码 项目:data-hub-backend 作者: uktrade-attic 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def korben_view(request, model):
    """View for Korben."""

    data = request.data
    try:
        obj = model.objects.get(pk=data['id'])
        for key, value in data.items():
            setattr(obj, key, value)
    except model.DoesNotExist:
        obj = model(**data)

    # create datetime objects for datetime fields
    for field in obj._meta.fields:
        if isinstance(field, DateTimeField):
            try:
                date_obj = parse_date(getattr(obj, field.name, None))
                setattr(obj, field.name, date_obj)
            except (ValueError, AttributeError):
                if field.null:
                    pass
                else:
                    return Response(data=data, status=HTTP_400_BAD_REQUEST)

    obj.save(as_korben=True)  # data comes from Korben, kill validation

    return Response(data={'message': 'OK'})
operations.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_db_converters(self, expression):
        converters = super(DatabaseOperations, self).get_db_converters(expression)
        internal_type = expression.output_field.get_internal_type()
        if internal_type == 'DateTimeField':
            converters.append(self.convert_datetimefield_value)
        elif internal_type == 'DateField':
            converters.append(self.convert_datefield_value)
        elif internal_type == 'TimeField':
            converters.append(self.convert_timefield_value)
        elif internal_type == 'DecimalField':
            converters.append(self.convert_decimalfield_value)
        elif internal_type == 'UUIDField':
            converters.append(self.convert_uuidfield_value)
        return converters
expressions.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
        copy = self.copy()
        copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
        field = copy.col.output_field
        assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
        if settings.USE_TZ:
            assert not isinstance(field, fields.DateTimeField), (
                "%r is a DateTimeField, not a DateField." % field.name
            )
        return copy
expressions.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, lookup, lookup_type, tzinfo):
        super(DateTime, self).__init__(output_field=fields.DateTimeField())
        self.lookup = lookup
        self.col = None
        self.lookup_type = lookup_type
        if tzinfo is None:
            self.tzname = None
        else:
            self.tzname = timezone._get_timezone_name(tzinfo)
        self.tzinfo = tzinfo
expressions.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
        copy = self.copy()
        copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
        field = copy.col.output_field
        assert isinstance(field, fields.DateTimeField), (
            "%r isn't a DateTimeField." % field.name
        )
        return copy
base.py 文件源码 项目:LatinSounds_AppEnviaMail 作者: G3ek-aR 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, output_field=None, **extra):
        if output_field is None:
            output_field = fields.DateTimeField()
        super(Now, self).__init__(output_field=output_field, **extra)
operations.py 文件源码 项目:django-wechat-api 作者: crazy-canux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def check_expression_support(self, expression):
        bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
        bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
        if isinstance(expression, bad_aggregates):
            try:
                output_field = expression.input_field.output_field
                if isinstance(output_field, bad_fields):
                    raise NotImplementedError(
                        'You cannot use Sum, Avg, StdDev and Variance aggregations '
                        'on date/time fields in sqlite3 '
                        'since date/time is saved as text.')
            except FieldError:
                # not every sub-expression has an output_field which is fine to
                # ignore
                pass
operations.py 文件源码 项目:django-wechat-api 作者: crazy-canux 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_db_converters(self, expression):
        converters = super(DatabaseOperations, self).get_db_converters(expression)
        internal_type = expression.output_field.get_internal_type()
        if internal_type == 'DateTimeField':
            converters.append(self.convert_datetimefield_value)
        elif internal_type == 'DateField':
            converters.append(self.convert_datefield_value)
        elif internal_type == 'TimeField':
            converters.append(self.convert_timefield_value)
        elif internal_type == 'DecimalField':
            converters.append(self.convert_decimalfield_value)
        elif internal_type == 'UUIDField':
            converters.append(self.convert_uuidfield_value)
        return converters
where.py 文件源码 项目:django-wechat-api 作者: crazy-canux 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _prepare_data(self, data):
        """
        Prepare data for addition to the tree. If the data is a list or tuple,
        it is expected to be of the form (obj, lookup_type, value), where obj
        is a Constraint object, and is then slightly munged before being
        stored (to avoid storing any reference to field objects). Otherwise,
        the 'data' is stored unchanged and can be any class with an 'as_sql()'
        method.
        """
        if not isinstance(data, (list, tuple)):
            return data
        obj, lookup_type, value = data
        if isinstance(value, collections.Iterator):
            # Consume any generators immediately, so that we can determine
            # emptiness and transform any non-empty values correctly.
            value = list(value)

        # The "value_annotation" parameter is used to pass auxiliary information
        # about the value(s) to the query construction. Specifically, datetime
        # and empty values need special handling. Other types could be used
        # here in the future (using Python types is suggested for consistency).
        if (isinstance(value, datetime.datetime)
                or (isinstance(obj.field, DateTimeField) and lookup_type != 'isnull')):
            value_annotation = datetime.datetime
        elif hasattr(value, 'value_annotation'):
            value_annotation = value.value_annotation
        else:
            value_annotation = bool(value)

        if hasattr(obj, 'prepare'):
            value = obj.prepare(lookup_type, value)
        return (obj, lookup_type, value_annotation, value)
expressions.py 文件源码 项目:django-wechat-api 作者: crazy-canux 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
        copy = self.copy()
        copy.col = query.resolve_ref(self.lookup, allow_joins, reuse, summarize)
        field = copy.col.output_field
        assert isinstance(field, fields.DateField), "%r isn't a DateField." % field.name
        if settings.USE_TZ:
            assert not isinstance(field, fields.DateTimeField), (
                "%r is a DateTimeField, not a DateField." % field.name
            )
        return copy
expressions.py 文件源码 项目:django-wechat-api 作者: crazy-canux 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, lookup, lookup_type, tzinfo):
        super(DateTime, self).__init__(output_field=fields.DateTimeField())
        self.lookup = lookup
        self.col = None
        self.lookup_type = lookup_type
        if tzinfo is None:
            self.tzname = None
        else:
            self.tzname = timezone._get_timezone_name(tzinfo)
        self.tzinfo = tzinfo
generic.py 文件源码 项目:Django-ERP 作者: andyzsf 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def export_selected_data(self,request,queryset):
        ops = self.model._meta
        workbook = xlwt.Workbook(encoding='utf-8')
        dd = datetime.date.today().strftime('%Y%m%d')
        file_name = force_text(ops.verbose_name+dd)
        sheet = workbook.add_sheet(force_text(ops.verbose_name))
        obj_fields = getattr(self,'export_fields',None) or self.list_display or self.fields

        head_col_index = 0
        for field in obj_fields:
            col_name = field
            try:
                f = ops.get_field(field)
                col_name = f.verbose_name
            except Exception,e:
                f = getattr(self.model,field)
                if hasattr(f,'short_description'):
                    col_name = f.short_description
            sheet.write(0,head_col_index,force_text(col_name))
            head_col_index+=1
        row_index = 1
        for obj in queryset:
            col_index = 0
            for field in obj_fields:
                f = field
                try:
                    f = ops.get_field(field)
                except Exception,e:
                    pass
                v = getattr(obj,field,'')
                if hasattr(v,'__call__') or callable(v):
                    v = v()
                elif type(f) == fields.DateField:
                    v = v.strftime('%Y-%m-%d')
                elif type(f) == fields.DateTimeField:
                    v = v.strftime('%Y-%m-%d %H:%M')
                elif type(f) == fields.CharField and f.choices:
                    fc = 'get_'+field+'_display'
                    v = getattr(obj,fc)()
                elif type(f) == related.ForeignKey:
                    v = str(v)
                sheet.write(row_index,col_index,v)
                col_index += 1
            row_index += 1
        response = HttpResponse(content_type='application/vnd.ms-excel')
        agent = request.META.get('HTTP_USER_AGENT')
        nn = smart_str(file_name)
        if agent and re.search('MSIE',agent):
            nn = urlquote(file_name)
        response['Content-Disposition'] = 'attachment; filename=%s.xls'%nn
        workbook.save(response)
        return response
        #self.message_user(request,'SUCCESS')
down.py 文件源码 项目:issue-reporting 作者: 6aika 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update_local_issue(
    gr_issue,
    id_namespace='',
    service_namespace='',
):
    """
    :param gr_issue: GeoReportv2 Issue structure (as a dict)
    :param id_namespace: String to prepend to request identifiers
    :param service_namespace: String to prepend to service codes
    :return: The created/updated Issue and a `created` flag
    """

    gr_issue = deepcopy(gr_issue)
    identifier = gr_issue.pop('service_request_id')
    if id_namespace:
        identifier = '%s:%s' % (id_namespace, identifier)
    issue = Issue.objects.filter(identifier=identifier).first()
    if not issue:
        issue = Issue(identifier=identifier)
        created = True
    else:
        created = False
    for field in Issue._meta.get_fields():
        if field.name in gr_issue:
            value = gr_issue.pop(field.name)
            if isinstance(field, DateTimeField):
                value = parse_date(value)
            setattr(issue, field.attname, value)
    if "long" in gr_issue and "lat" in gr_issue:
        issue.location = GEOSGeometry(
            'SRID=4326;POINT(%s %s)' % (gr_issue.pop('long'), gr_issue.pop('lat'))
        )
    if 'service_code' in gr_issue:
        gr_issue['service_code'] = '%s%s' % (service_namespace, gr_issue['service_code'])
    # This has no direct mapping in our schema, but it can be used by implicit autocreation of services
    issue.service_name = gr_issue.pop('service_name', None)
    issue._cache_data()
    issue.full_clean()
    issue.save()

    extended_attributes = gr_issue.pop('extended_attributes', {})
    for ex_class in get_extensions():
        ex = ex_class()
        ex.parse_extended_attributes(issue, extended_attributes)
    if gr_issue:
        print(gr_issue)
    issue.source = gr_issue
    return (issue, created)
tunga_manage_task_status.py 文件源码 项目:tunga-api 作者: tunga-io 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        """
        Update periodic update events and send notifications for upcoming update events.
        """
        # command to run: python manage.py tunga_manage_task_status

        # Choose tasks that aren't closed or under review already
        tasks_filter = Task.objects.filter(
            scope=TASK_SCOPE_TASK, closed=False, review=False
        ).annotate(
            activated_at=Case(
                When(
                    approved_at__isnull=True,
                    then='created_at'
                ),
                default='approved_at',
                output_field=DateTimeField()
            )
        )

        utc_now = datetime.datetime.utcnow()

        # Remind admins and devs about approved tasks with no applications 2 days after creation or approval
        min_date_no_applications = utc_now - relativedelta(days=2)
        min_date_no_developer_selected = utc_now - relativedelta(days=10)
        tasks_no_applications = tasks_filter.filter(
            approved=True, participants__isnull=False, activated_at__range=[
                min_date_no_developer_selected, min_date_no_applications
            ]
        )
        for task in tasks_no_applications:
            # Remind admins
            remind_no_task_applications.delay(task.id, admin=True)

            # Remind devs
            remind_no_task_applications.delay(task.id, admin=False)

        # Remind admins to take action on tasks with no accepted applications 10 days after creation or approval
        tasks_no_developers_selected = tasks_filter.filter(
            participants__isnull=True, created_at__lte=min_date_no_developer_selected
        )
        for task in tasks_no_developers_selected:
            # Put task in review
            task.review = True
            task.save()

            # Notify admins to take action
            notify_review_task_admin.delay(task.id)


问题


面经


文章

微信
公众号

扫码关注公众号