python类Q的实例源码

subqueries.py 文件源码 项目:lifesoundtrack 作者: MTG 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_batch(self, pk_list, values, using):
        self.add_update_values(values)
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(pk__in=pk_list[offset: offset + GET_ITERATOR_CHUNK_SIZE]))
            self.get_compiler(using).execute_sql(NO_RESULTS)
expressions.py 文件源码 项目:lifesoundtrack 作者: MTG 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, condition=None, then=None, **lookups):
        if lookups and condition is None:
            condition, lookups = Q(**lookups), None
        if condition is None or not isinstance(condition, Q) or lookups:
            raise TypeError("__init__() takes either a Q object or lookups as keyword arguments")
        super(When, self).__init__(output_field=None)
        self.condition = condition
        self.result = self._parse_expressions(then)[0]
managers.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def published(self, for_user=UNSET, force_exchange=False):
        """
        Apply additional filtering of published items over that done in
        `PublishingQuerySet.published` to filter based on additional publising
        date fields used by Fluent.
        """
        if for_user is not UNSET:
            return self.visible()

        queryset = super(PublishingUrlNodeQuerySet, self).published(
            for_user=for_user, force_exchange=force_exchange)

        # Exclude by publication date on the published version of items, *not*
        # the draft vesion, or we could get the wrong result.
        # Exclude fields of published copy of draft items, not draft itself...
        queryset = queryset.exclude(
            Q(publishing_is_draft=True) & Q(
                Q(publishing_linked__publication_date__gt=now())
                | Q(publishing_linked__publication_end_date__lte=now())))
        # ...and exclude fields directly on published items
        queryset = queryset.exclude(
            Q(publishing_is_draft=False) & Q(
                Q(publication_date__gt=now())
                | Q(publication_end_date__lte=now())))

        return queryset
managers.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def published(self, for_user=None, force_exchange=True):
        """
        Customise `UrlNodeQuerySet.published()` to add filtering by publication
        date constraints and exchange of draft items for published ones.
        """
        qs = self._single_site()
        # Avoid filtering to only published items when we are in a draft
        # context and we know this method is triggered by Fluent (because
        # the `for_user` is present) because we may actually want to find
        # and return draft items to priveleged users in this situation.
        if for_user and is_draft_request_context():
            return qs

        if for_user is not None and for_user.is_staff:
            pass  # Don't filter by publication date for Staff
        else:
            qs = qs.filter(
                    Q(publication_date__isnull=True) |
                    Q(publication_date__lt=now())
                ).filter(
                    Q(publication_end_date__isnull=True) |
                    Q(publication_end_date__gte=now())
                )
        if force_exchange:
            return _exchange_for_published(qs)
        else:
            return qs.filter(status=UrlNode.PUBLISHED)
views.py 文件源码 项目:djangoSIGE 作者: thiagopena 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_context_data(self, **kwargs):
        context = super(EditarPermissoesUsuarioView,
                        self).get_context_data(**kwargs)
        user = User.objects.get(pk=self.kwargs['pk'])
        context['user'] = user
        condition = reduce(operator.or_, [Q(codename__icontains=s) for s in [
                           'add_', 'change_', 'view_', 'delete_']])
        context['default_permissions'] = Permission.objects.filter(
            condition, content_type__model__in=DEFAULT_PERMISSION_MODELS)
        context['custom_permissions'] = Permission.objects.filter(
            codename__in=CUSTOM_PERMISSIONS)
        return context
views.py 文件源码 项目:online_edu 作者: gordoning 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def search_keyword(request):
    try:
        # request.POST.get('')
        word = request.POST.get('word')
        courses = Course.objects.filter(Q(search_keywords__course__name__icontains=word) | Q(name__icontains=word)).distinct().values("id","name")
        # courses = Course.objects.all().values("id","name")
        # career_courses = CareerCourse.objects.filter(search_keywords__contains=word).values("id","name")
    except Exception:
        pass

    result_course = [x for x in courses]

    keywords = {"keywords":result_course,}

    return HttpResponse(json.dumps(keywords))
models.py 文件源码 项目:django-binder 作者: CodeYellowBV 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_q(self, qualifier, value, invert, partial=''):
        self.check_qualifier(qualifier)

        # TODO: Try to make the splitting and cleaning more re-usable
        if qualifier in ('in', 'range'):
            values = value.split(',')
            if qualifier == 'range':
                if len(values) != 2:
                    raise BinderRequestError('Range requires exactly 2 values for {}.'.format(self.field_description()))
        else:
            values = [value]


        if qualifier == 'isnull':
            cleaned_value = True
        elif qualifier in ('in', 'range'):
            cleaned_value = [self.clean_value(qualifier, v) for v in values]
        else:
            try:
                cleaned_value = self.clean_value(qualifier, values[0])
            except IndexError:
                raise ValidationError('Value for filter {{{}}}.{{{}}} may not be empty.'.format(self.field.model.__name__, self.field.name))

        suffix = '__' + qualifier if qualifier else ''
        if invert:
            return ~Q(**{partial + self.field.name + suffix: cleaned_value})
        else:
            return Q(**{partial + self.field.name + suffix: cleaned_value})
models.py 文件源码 项目:django-binder 作者: CodeYellowBV 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_q(self, qualifier, value, invert, partial=''):
        self.check_qualifier(qualifier)

        # TODO: Try to make the splitting and cleaning more re-usable
        if qualifier in ('in', 'range'):
            values = value.split(',')
            if qualifier == 'range':
                if len(values) != 2:
                    raise BinderRequestError('Range requires exactly 2 values for {}.'.format(self.field_description()))
        else:
            values = [value]


        if qualifier == 'isnull':
            cleaned_value = True
        elif qualifier in ('in', 'range'):
            cleaned_value = [self.clean_value(qualifier, v) for v in values]
        else:
            try:
                cleaned_value = self.clean_value(qualifier, values[0])
            except IndexError:
                raise ValidationError('Value for filter {{{}}}.{{{}}} may not be empty.'.format(self.field.model.__name__, self.field.name))

        suffix = '__' + qualifier if qualifier else ''
        if invert:
            return ~Q(**{partial + self.field.name + suffix: cleaned_value})
        else:
            return Q(**{partial + self.field.name + suffix: cleaned_value})
test_resource_manager.py 文件源码 项目:rotest 作者: gregoil 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_lock_unavailable_resource_timeout(self):
        """Lock an already locked resource & validate failure after timeout.

        * Validates the DB initial state.
        * Locks an already locked resource, using resource client.
        * Validates a ResourceUnavailableError is raised.
        * Validates 'lock_resources' duration is greater then the timeout.
        """
        resources_num = DemoResourceData.objects.filter(~Q(owner=""),
                                  name=self.LOCKED1_NAME).count()

        self.assertEquals(resources_num, 1, "Expected 1 locked "
                          "resource with name %r in DB found %d"
                          % (self.LOCKED1_NAME, resources_num))

        descriptor = Descriptor(DemoResource, name=self.LOCKED1_NAME)

        start_time = time.time()
        self.assertRaises(ResourceUnavailableError,
                          self.client._lock_resources,
                          descriptors=[descriptor],
                          timeout=self.LOCK_TIMEOUT)

        duration = time.time() - start_time
        self.assertGreaterEqual(duration, self.LOCK_TIMEOUT, "Waiting for "
                                "resources took %.2f seconds, but should take "
                                "at least %d" % (duration, self.LOCK_TIMEOUT))
test_resource_manager.py 文件源码 项目:rotest 作者: gregoil 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_lock_multiple_matches(self):
        """Lock a resource, parameters matching more then one result.

        * Validates the DB initial state.
        * Locks a resource using parameters that match more than one resource,
          using resource client.
        * Validates only one resource returned.
        * Validates the returned resource is now marked as locked.
        * Validates there is still 1 available resource with same parameters.
        """
        common_parameters = {'ip_address': "1.1.1.1"}
        resources_num = DemoResourceData.objects.filter(owner="",
                                                **common_parameters).count()

        self.assertEquals(resources_num, 2, "Expected 2 available "
                          "resources with parameters %r in DB found %d"
                          % (common_parameters, resources_num))

        descriptor = Descriptor(DemoResource, **common_parameters)
        resources = self.client._lock_resources(descriptors=[descriptor],
                                                timeout=self.LOCK_TIMEOUT)

        resources_num = len(resources)
        self.assertEquals(resources_num, 1, "Expected list with 1 "
                          "resource in it but found %d" % resources_num)

        locked_resource_name = resources[0].name

        resources_num = descriptor.type.DATA_CLASS.objects.filter(~Q(owner=""),
                                  name=locked_resource_name).count()

        self.assertEquals(resources_num, 1, "Expected 1 locked "
                          "resource with name %r in DB, found %d"
                          % (locked_resource_name, resources_num))

        resources_num = descriptor.type.DATA_CLASS.objects.filter(owner="",
                                    **common_parameters).count()

        self.assertGreaterEqual(resources_num, 1, "Expected at least 1 "
                                "available resource with the same parameters "
                                "in DB found %d" % resources_num)
placeholder.py 文件源码 项目:DjangoCMS 作者: farhan711 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_copy_languages(self, placeholder, model, fieldname, **kwargs):
        manager = model.objects
        src = manager.get(**{fieldname: placeholder})
        query = Q(master=src.master)
        query &= Q(**{'%s__cmsplugin__isnull' % fieldname: False})
        query &= ~Q(pk=src.pk)

        language_codes = manager.filter(query).values_list('language_code', flat=True).distinct()
        return [(lc, dict(settings.LANGUAGES)[lc]) for lc in language_codes]
managers.py 文件源码 项目:fluentcms-publishing 作者: bashu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def published(self, for_user=UNSET, force_exchange=False):
        """
        Apply additional filtering of published items over that done in
        `PublishingQuerySet.published` to filter based on additional publising
        date fields used by Fluent.
        """
        if for_user is not UNSET:
            return self.visible()

        queryset = super(PublishingUrlNodeQuerySet, self).published(
            for_user=for_user, force_exchange=force_exchange)

        # Exclude by publication date on the published version of items, *not*
        # the draft vesion, or we could get the wrong result.
        # Exclude fields of published copy of draft items, not draft itself...
        queryset = queryset.exclude(
            Q(publishing_is_draft=True) & Q(
                Q(publishing_linked__publication_date__gt=now())
                | Q(publishing_linked__publication_end_date__lte=now())))
        # ...and exclude fields directly on published items
        queryset = queryset.exclude(
            Q(publishing_is_draft=False) & Q(
                Q(publication_date__gt=now())
                | Q(publication_end_date__lte=now())))

        return queryset
managers.py 文件源码 项目:fluentcms-publishing 作者: bashu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def published(self, for_user=None, force_exchange=True):
        """
        Customise `UrlNodeQuerySet.published()` to add filtering by publication
        date constraints and exchange of draft items for published ones.
        """
        qs = self._single_site()
        # Avoid filtering to only published items when we are in a draft
        # context and we know this method is triggered by Fluent (because
        # the `for_user` is present) because we may actually want to find
        # and return draft items to priveleged users in this situation.
        if for_user and is_draft_request_context():
            return qs

        if for_user is not None and for_user.is_staff:
            pass  # Don't filter by publication date for Staff
        else:
            qs = qs.filter(
                    Q(publication_date__isnull=True) |
                    Q(publication_date__lt=now())
                ).filter(
                    Q(publication_end_date__isnull=True) |
                    Q(publication_end_date__gte=now())
                )
        if force_exchange:
            return _exchange_for_published(qs)
        else:
            return qs.filter(status=UrlNode.PUBLISHED)
query.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _filter_or_exclude(self, negate, *args, **kwargs):
        if args or kwargs:
            assert self.query.can_filter(), \
                "Cannot filter a query once a slice has been taken."

        clone = self._clone()
        if negate:
            clone.query.add_q(~Q(*args, **kwargs))
        else:
            clone.query.add_q(Q(*args, **kwargs))
        return clone
subqueries.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_qs(self, query, using):
        """
        Delete the queryset in one SQL query (if possible). For simple queries
        this is done by copying the query.query.where to self.query, for
        complex queries by using subquery.
        """
        innerq = query.query
        # Make sure the inner query has at least one table in use.
        innerq.get_initial_alias()
        # The same for our new query.
        self.get_initial_alias()
        innerq_used_tables = [t for t in innerq.tables
                              if innerq.alias_refcount[t]]
        if not innerq_used_tables or innerq_used_tables == self.tables:
            # There is only the base table in use in the query.
            self.where = innerq.where
        else:
            pk = query.model._meta.pk
            if not connections[using].features.update_can_self_select:
                # We can't do the delete using subquery.
                values = list(query.values_list('pk', flat=True))
                if not values:
                    return 0
                return self.delete_batch(values, using)
            else:
                innerq.clear_select_clause()
                innerq.select = [
                    pk.get_col(self.get_initial_alias())
                ]
                values = innerq
            self.where = self.where_class()
            self.add_q(Q(pk__in=values))
        cursor = self.get_compiler(using).execute_sql(CURSOR)
        return cursor.rowcount if cursor else 0
subqueries.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_batch(self, pk_list, values, using):
        self.add_update_values(values)
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(pk__in=pk_list[offset: offset + GET_ITERATOR_CHUNK_SIZE]))
            self.get_compiler(using).execute_sql(NO_RESULTS)
placeholder.py 文件源码 项目:django-cms-articles 作者: misli 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_copy_languages(self, placeholder, model, fieldname, **kwargs):
        manager = model.objects
        src = manager.get(**{fieldname: placeholder})
        query = Q(master=src.master)
        query &= Q(**{'%s__cmsplugin__isnull' % fieldname: False})
        query &= ~Q(pk=src.pk)

        language_codes = manager.filter(query).values_list('language_code', flat=True).distinct()
        return [(lc, dict(settings.LANGUAGES)[lc]) for lc in language_codes]
subqueries.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_qs(self, query, using):
        """
        Delete the queryset in one SQL query (if possible). For simple queries
        this is done by copying the query.query.where to self.query, for
        complex queries by using subquery.
        """
        innerq = query.query
        # Make sure the inner query has at least one table in use.
        innerq.get_initial_alias()
        # The same for our new query.
        self.get_initial_alias()
        innerq_used_tables = [t for t in innerq.tables
                              if innerq.alias_refcount[t]]
        if not innerq_used_tables or innerq_used_tables == self.tables:
            # There is only the base table in use in the query.
            self.where = innerq.where
        else:
            pk = query.model._meta.pk
            if not connections[using].features.update_can_self_select:
                # We can't do the delete using subquery.
                values = list(query.values_list('pk', flat=True))
                if not values:
                    return 0
                return self.delete_batch(values, using)
            else:
                innerq.clear_select_clause()
                innerq.select = [
                    pk.get_col(self.get_initial_alias())
                ]
                values = innerq
            self.where = self.where_class()
            self.add_q(Q(pk__in=values))
        cursor = self.get_compiler(using).execute_sql(CURSOR)
        return cursor.rowcount if cursor else 0
subqueries.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_batch(self, pk_list, values, using):
        self.add_update_values(values)
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(pk__in=pk_list[offset: offset + GET_ITERATOR_CHUNK_SIZE]))
            self.get_compiler(using).execute_sql(NO_RESULTS)
expressions.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, condition=None, then=None, **lookups):
        if lookups and condition is None:
            condition, lookups = Q(**lookups), None
        if condition is None or not isinstance(condition, Q) or lookups:
            raise TypeError("__init__() takes either a Q object or lookups as keyword arguments")
        super(When, self).__init__(output_field=None)
        self.condition = condition
        self.result = self._parse_expressions(then)[0]


问题


面经


文章

微信
公众号

扫码关注公众号