python类Q的实例源码

subqueries.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_batch(self, pk_list, using, field=None):
        """
        Set up and execute delete queries for all the objects in pk_list.

        More than one physical query may be executed if there are a
        lot of values in pk_list.
        """
        # number of objects deleted
        num_deleted = 0
        if not field:
            field = self.get_meta().pk
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(
                **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]}))
            num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using)
        return num_deleted
test_edit_index.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_populate(self):
        from chroma_core.services.plugin_runner.resource_manager import EdgeIndex

        resource_record, couplet_resource = self._make_global_resource('example_plugin', 'Couplet', {'address_1': 'foo', 'address_2': 'bar'})
        controller_resource = self._make_local_resource('example_plugin', 'Controller', index = 0, parents = [couplet_resource])

        self.resource_manager.session_open(self.plugin,
                                           resource_record.pk,
                                           [couplet_resource, controller_resource],
                                           60)

        # By not fetching the Couple and not fetching the plugin we should be left with 1 entry, this will raise an exception if the
        # result is not 1 entry.
        controller_record = StorageResourceRecord.objects.get(~Q(id = resource_record.pk), ~Q(id = self.plugin._scannable_id))

        index = EdgeIndex()
        index.populate()
        self.assertEqual(index.get_parents(controller_record.pk), [resource_record.pk])
        self.assertEqual(index.get_children(resource_record.pk), [controller_record.pk])
host.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_usable_luns(cls, queryset):
        """
        Get all Luns which are not used by Targets and have enough VolumeNode configuration
        to be used as a Target (i.e. have only one node or at least have a primary node set)

        Luns are usable if they have only one VolumeNode (i.e. no HA available but
        we can definitively say where it should be mounted) or if they have
        a primary VolumeNode (i.e. one or more VolumeNodes is available and we
        know at least where the primary mount should be)
        """
        queryset = cls.get_unused_luns(queryset)\
                      .filter(volumenode__host__not_deleted=True)\
                      .annotate(has_primary=BoolOr('volumenode__primary'), num_volumenodes=Count('volumenode'))\
                      .filter(Q(num_volumenodes=1) | Q(has_primary=True))

        return queryset
subqueries.py 文件源码 项目:lifesoundtrack 作者: MTG 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_batch(self, pk_list, using, field=None):
        """
        Set up and execute delete queries for all the objects in pk_list.

        More than one physical query may be executed if there are a
        lot of values in pk_list.
        """
        # number of objects deleted
        num_deleted = 0
        if not field:
            field = self.get_meta().pk
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(
                **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]}))
            num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using)
        return num_deleted
test_resource_manager.py 文件源码 项目:rotest 作者: gregoil 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_lock_already_locked_resource(self):
        """Lock an already locked resource & validate failure.

        * Validates the DB initial state.
        * Locks an already locked resource, using resource client.
        * Validates a ResourceUnavailableError is raised.
        """
        resources_num = DemoResourceData.objects.filter(~Q(owner=""),
                                  name=self.LOCKED1_NAME).count()

        self.assertEquals(resources_num, 1, "Expected 1 locked "
                          "resource with name %r in DB found %d"
                          % (self.LOCKED1_NAME, resources_num))

        descriptor = Descriptor(DemoResource, name=self.LOCKED1_NAME)
        self.assertRaises(ResourceUnavailableError,
                          self.client._lock_resources,
                          descriptors=[descriptor],
                          timeout=self.LOCK_TIMEOUT)
manager.py 文件源码 项目:rotest 作者: gregoil 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query_resources(self, request):
        """Find and return the resources that answer the client's query.

        Args:
            request (Request): QueryResources request.

        Returns:
            ResourcesReply. a reply containing matching resources.
        """
        desc = ResourceDescriptor.decode(request.message.descriptors)
        self.logger.debug("Looking for resources with description %r", desc)

        # query for resources that are usable and match the descriptors
        query = (Q(is_usable=True, **desc.properties))
        matches = desc.type.objects.filter(query)

        if matches.count() == 0:
            raise ResourceDoesNotExistError("No existing resource meets "
                                            "the requirements: %r" % desc)

        query_result = [resource for resource in matches]

        return ResourcesReply(resources=query_result)
query.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def complex_filter(self, filter_obj):
        """
        Returns a new QuerySet instance with filter_obj added to the filters.

        filter_obj can be a Q object (or anything with an add_to_query()
        method) or a dictionary of keyword lookup arguments.

        This exists to support framework features such as 'limit_choices_to',
        and usually it will be more natural to use other methods.
        """
        if isinstance(filter_obj, Q) or hasattr(filter_obj, 'add_to_query'):
            clone = self._clone()
            clone.query.add_q(filter_obj)
            return clone
        else:
            return self._filter_or_exclude(None, **filter_obj)
subqueries.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete_batch(self, pk_list, using):
        """
        Set up and execute delete queries for all the objects in pk_list.

        More than one physical query may be executed if there are a
        lot of values in pk_list.
        """
        # number of objects deleted
        num_deleted = 0
        field = self.get_meta().pk
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(
                **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]}))
            num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using)
        return num_deleted
subqueries.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete_batch(self, pk_list, using, field=None):
        """
        Set up and execute delete queries for all the objects in pk_list.

        More than one physical query may be executed if there are a
        lot of values in pk_list.
        """
        # number of objects deleted
        num_deleted = 0
        if not field:
            field = self.get_meta().pk
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(
                **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]}))
            num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using)
        return num_deleted
subqueries.py 文件源码 项目:django-next-train 作者: bitpixdigital 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete_batch(self, pk_list, using, field=None):
        """
        Set up and execute delete queries for all the objects in pk_list.

        More than one physical query may be executed if there are a
        lot of values in pk_list.
        """
        # number of objects deleted
        num_deleted = 0
        if not field:
            field = self.get_meta().pk
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(
                **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]}))
            num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using)
        return num_deleted
query.py 文件源码 项目:LatinSounds_AppEnviaMail 作者: G3ek-aR 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def complex_filter(self, filter_obj):
        """
        Returns a new QuerySet instance with filter_obj added to the filters.

        filter_obj can be a Q object (or anything with an add_to_query()
        method) or a dictionary of keyword lookup arguments.

        This exists to support framework features such as 'limit_choices_to',
        and usually it will be more natural to use other methods.
        """
        if isinstance(filter_obj, Q) or hasattr(filter_obj, 'add_to_query'):
            clone = self._clone()
            clone.query.add_q(filter_obj)
            return clone
        else:
            return self._filter_or_exclude(None, **filter_obj)
subqueries.py 文件源码 项目:LatinSounds_AppEnviaMail 作者: G3ek-aR 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_batch(self, pk_list, using):
        """
        Set up and execute delete queries for all the objects in pk_list.

        More than one physical query may be executed if there are a
        lot of values in pk_list.
        """
        # number of objects deleted
        num_deleted = 0
        field = self.get_meta().pk
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(
                **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]}))
            num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using)
        return num_deleted
dockerauth.py 文件源码 项目:authserver 作者: jdelic 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _remove_registry(self, name: Optional[str]=None, client_id: Optional[str]=None, force: bool=False) -> None:
        query = Q()
        if name:
            query |= Q(name__exact=name)
        if client_id:
            query |= Q(client_id__exact=client_id)

        registry = DockerRegistry.objects.filter(query)
        if registry.count() == 0:
            self.stderr.write("No matching registry found for the given criteria.")
            sys.exit(1)
        elif registry.count() > 1:
            self.stderr.write("Criteria matched more than a single registry.")
            sys.exit(1)
        else:
            self.stdout.write("\nRegistry-----------\nName:      %s\nClient id: %s\n\n" %
                              (registry[0].name, registry[0].client_id))
            if force or self._ask_confirmation("Really delete the above registry? [yN]", default=False):
                regname = registry[0].name
                registry.delete()
                self.stderr.write(self.style.SUCCESS("Removed docker registry \"%s\"." % regname))
                return
            else:
                sys.exit(1)
filterbackends.py 文件源码 项目:tunga-api 作者: tunga-io 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def filter_list_queryset(self, request, queryset, view):
        if request.user.is_authenticated():
            if request.user.is_staff or request.user.is_superuser:
                queryset = queryset.filter(
                    Q(channeluser__user=request.user) | Q(type=CHANNEL_TYPE_SUPPORT) | Q(type=CHANNEL_TYPE_DEVELOPER)
                )
            elif request.user.is_developer:
                queryset = queryset.filter(
                    Q(channeluser__user=request.user) | Q(type=CHANNEL_TYPE_DEVELOPER)
                )
            else:
                queryset = queryset.filter(channeluser__user=request.user)
            if not request.query_params.get('type', None):
                queryset = queryset.exclude(type=CHANNEL_TYPE_SUPPORT)
            return queryset
        return queryset.none()
filters.py 文件源码 项目:tunga-api 作者: tunga-io 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def filter_payment_status(self, queryset, name, value):
        queryset = queryset.filter(closed=True)
        if value in ['paid', 'processing']:
            request = self.request
            is_po = request and request.user and request.user.is_authenticated() and request.user.is_project_owner and not request.user.is_admin
            if value == 'paid':
                return is_po and queryset or queryset.filter(paid=True, pay_distributed=True)
            else:
                processing_filter = (Q(processing=True) & Q(paid=False))
                if not is_po:
                    processing_filter = processing_filter | (Q(paid=True) & Q(pay_distributed=False))
                return queryset.filter(processing_filter)
        elif value == 'pending':
            queryset = queryset.filter(processing=False, paid=False)
        elif value == 'distribute':
            queryset = queryset.filter(
                payment_method=TASK_PAYMENT_METHOD_STRIPE,
                paid=True, btc_paid=False, pay_distributed=False
            )
        return queryset
filterbackends.py 文件源码 项目:tunga-api 作者: tunga-io 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def filter_list_queryset(self, request, queryset, view):
        label_filter = request.query_params.get('filter', None)
        threshold_date = datetime.datetime.utcnow() - relativedelta(hours=24)
        if label_filter == 'upcoming':
            queryset = queryset.filter(
                due_at__gt=threshold_date, progressreport__isnull=True
            )
        elif label_filter in ['complete', 'finished']:
            queryset = queryset.filter(
                progressreport__isnull=False
            )
        elif label_filter == 'missed':
            queryset = queryset.filter(
                due_at__lt=threshold_date, progressreport__isnull=True
            )
        if request.user.is_staff or request.user.is_superuser:
            return queryset
        return queryset.filter(
            Q(created_by=request.user) |
            Q(task__user=request.user) |
            (
                Q(task__participation__user=request.user) &
                Q(task__participation__status__in=[STATUS_INITIAL, STATUS_ACCEPTED])
            )
        )
models.py 文件源码 项目:tunga-api 作者: tunga-io 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def has_object_read_permission(self, request):
        if str(self.edit_token) == get_edit_token_header(request) or request.user == self.user or \
                (self.parent and request.user == self.parent.user) or \
                self.has_admin_access(request.user) or \
                (request.user.is_authenticated() and request.user.is_project_manager): #and (self.pm == request.user or not self.pm)):
            return True
        elif self.visibility == VISIBILITY_DEVELOPER:
            return request.user.is_authenticated() and request.user.is_developer
        elif self.visibility == VISIBILITY_MY_TEAM:
            return bool(
                Connection.objects.exclude(status=STATUS_REJECTED).filter(
                    Q(from_user=self.user, to_user=request.user) | Q(from_user=request.user, to_user=self.user)
                ).count()
            )
        elif self.visibility == VISIBILITY_CUSTOM:
            return self.subtask_participants_inclusive_filter.filter(
                user=request.user, status__in=[STATUS_INITIAL, STATUS_ACCEPTED]
            ).count()
        return False
querysets.py 文件源码 项目:django-datawatch 作者: RegioHelden 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def for_user(self, user):
        return self.filter(Q(assigned_to_group__isnull=True) | Q(assigned_to_group__in=user.groups.all()),
                           Q(assigned_to_user__isnull=True) | Q(assigned_to_user=user))
subqueries.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_qs(self, query, using):
        """
        Delete the queryset in one SQL query (if possible). For simple queries
        this is done by copying the query.query.where to self.query, for
        complex queries by using subquery.
        """
        innerq = query.query
        # Make sure the inner query has at least one table in use.
        innerq.get_initial_alias()
        # The same for our new query.
        self.get_initial_alias()
        innerq_used_tables = [t for t in innerq.tables
                              if innerq.alias_refcount[t]]
        if not innerq_used_tables or innerq_used_tables == self.tables:
            # There is only the base table in use in the query.
            self.where = innerq.where
        else:
            pk = query.model._meta.pk
            if not connections[using].features.update_can_self_select:
                # We can't do the delete using subquery.
                values = list(query.values_list('pk', flat=True))
                if not values:
                    return 0
                return self.delete_batch(values, using)
            else:
                innerq.clear_select_clause()
                innerq.select = [
                    pk.get_col(self.get_initial_alias())
                ]
                values = innerq
            self.where = self.where_class()
            self.add_q(Q(pk__in=values))
        cursor = self.get_compiler(using).execute_sql(CURSOR)
        return cursor.rowcount if cursor else 0
subqueries.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_batch(self, pk_list, values, using):
        self.add_update_values(values)
        for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
            self.where = self.where_class()
            self.add_q(Q(pk__in=pk_list[offset: offset + GET_ITERATOR_CHUNK_SIZE]))
            self.get_compiler(using).execute_sql(NO_RESULTS)
expressions.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, condition=None, then=None, **lookups):
        if lookups and condition is None:
            condition, lookups = Q(**lookups), None
        if condition is None or not isinstance(condition, Q) or lookups:
            raise TypeError("__init__() takes either a Q object or lookups as keyword arguments")
        super(When, self).__init__(output_field=None)
        self.condition = condition
        self.result = self._parse_expressions(then)[0]
querysets.py 文件源码 项目:ODM2WebSDL 作者: ODM2 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def deployments(self):
        return self.filter(Q(action_type='Equipment deployment') | Q(action_type='Instrument deployment'))
lookups.py 文件源码 项目:djangocms-votes 作者: luisza 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_query(self, q, request):
        return self.model.objects.filter(
            Q(translations__title__icontains=q
              ) | Q(translations__lead_in__icontains=q)
        ).order_by('translations__slug')[:20]
lookups.py 文件源码 项目:djangocms-votes 作者: luisza 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_query(self, q, request):
        return self.model.objects.filter(
            Q(translations__name__icontains=q
              ) | Q(user__first_name__icontains=q
                    ) | Q(user__last_name__icontains=q)
        ).order_by('translations__name')[:20]
lookups.py 文件源码 项目:djangocms-votes 作者: luisza 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def get_query(self, q, request):
        return self.model.objects.filter(
            Q(translations__name__icontains=q
              ) | Q(translations__description__icontains=q)).order_by(
                  'translations__name')[:20]
host.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        try:
            ManagedHost.objects.get(~Q(pk = self.pk), fqdn = self.fqdn)
            raise IntegrityError("FQDN %s in use" % self.fqdn)
        except ManagedHost.DoesNotExist:
            pass

        super(ManagedHost, self).save(*args, **kwargs)
resource_manager.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def populate(self):
        for srr in StorageResourceRecord.objects.filter(~Q(parents = None)).values('id', 'parents'):
            child = srr['id']
            parent = srr['parents']
            self.add_parent(child, parent)
views.py 文件源码 项目:drapo 作者: andgein 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def attempts(request, contest_id):
    contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)

    attempts = contest.attempts.order_by('-created_at').select_related(
        'task', 'participant', 'participant__teamparticipant', 'participant__individualparticipant', 'author'
    )

    form = forms.AttemptsSearchForm(data=request.GET)
    if form.is_valid():
        pattern = form.cleaned_data['pattern']
        if pattern != '':
            attempts = attempts.filter(Q(task__name__icontains=pattern) |
                                       Q(author__username__icontains=pattern) |
                                       Q(author__first_name__icontains=pattern) |
                                       Q(author__last_name__icontains=pattern) |
                                       Q(participant__teamparticipant__team__name__icontains=pattern) |
                                       Q(answer__icontains=pattern))

    return render(request, 'contests/attempts.html', {
        'current_contest': contest,

        'contest': contest,
        'pattern': pattern,
        'attempts': attempts,
        'form': form,
    })
models.py 文件源码 项目:drapo 作者: andgein 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_open_tasks(self, participant):
        return ManualOpenedTask.objects.filter(
            contest=self.contest
        ).filter(
            Q(participant__isnull=True) | Q(participant=participant)
        ).values_list('task_id', flat=True)
subqueries.py 文件源码 项目:lifesoundtrack 作者: MTG 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_qs(self, query, using):
        """
        Delete the queryset in one SQL query (if possible). For simple queries
        this is done by copying the query.query.where to self.query, for
        complex queries by using subquery.
        """
        innerq = query.query
        # Make sure the inner query has at least one table in use.
        innerq.get_initial_alias()
        # The same for our new query.
        self.get_initial_alias()
        innerq_used_tables = [t for t in innerq.tables
                              if innerq.alias_refcount[t]]
        if not innerq_used_tables or innerq_used_tables == self.tables:
            # There is only the base table in use in the query.
            self.where = innerq.where
        else:
            pk = query.model._meta.pk
            if not connections[using].features.update_can_self_select:
                # We can't do the delete using subquery.
                values = list(query.values_list('pk', flat=True))
                if not values:
                    return 0
                return self.delete_batch(values, using)
            else:
                innerq.clear_select_clause()
                innerq.select = [
                    pk.get_col(self.get_initial_alias())
                ]
                values = innerq
            self.where = self.where_class()
            self.add_q(Q(pk__in=values))
        cursor = self.get_compiler(using).execute_sql(CURSOR)
        return cursor.rowcount if cursor else 0


问题


面经


文章

微信
公众号

扫码关注公众号