python类queries()的实例源码

queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_sample_results(cls):
        read = """QUERY - 'SELECT "auth_user"."id" FROM "auth_group"'""" \
            """- PARAMS = ()"""
        write = """QUERY - 'UPDATE "auth_group" SET "name" = %s'""" \
            """- PARAMS = ('bar',)"""
        other = """QUERY - 'BEGIN TRANSACTION' - PARAMS = ()"""

        def to_query(sql):
            return {'sql': sql, 'time': '%.3f' % random.random()}

        def to_single_result(*sqls):
            qc = cls()
            qc.queries = [to_query(sql) for sql in sqls]
            return qc.get_results_to_send()

        return [
            to_single_result(*sqls)
            for sqls in [
                [read], [read, write], [read, read], [write, write],
                [other, other], [read, write, other]
            ]
        ]
debug_tools.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_debug_sql_logger(caplog, settings):
    from pootle_project.models import Project

    from django.db import connection

    settings.DEBUG = True

    queries = len(connection.queries)

    log_new_queries(queries)
    assert caplog.records == []

    # trigger some sql and log
    Project.objects.count()
    log_new_queries(queries)

    timing = caplog.records[0].message
    sql = caplog.records[1].message

    # match the timing, sql
    assert re.match("^\d+?\.\d+?$", timing)
    assert "SELECT COUNT" in sql
    assert "pootle_app_project" in sql
utils.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def log_test_timing(debug_logger, timings, name, start):
    from django.db import connection

    time_taken = time.time() - start
    timings["tests"][name] = dict(
        slow_queries=[
            q for q
            in connection.queries
            if float(q["time"]) > 0],
        query_count=len(connection.queries),
        timing=time_taken)
    debug_logger.debug(
        "{: <70} {: <10} {: <10}".format(
            *(name,
              round(time_taken, 4),
              len(connection.queries))))
utils.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def log_test_report(debug_logger, timings):
    debug_logger.debug(
        "%s\nTESTS END: %s",
        "=" * 80,
        datetime.now())
    total_time = time.time() - timings["start"]
    total_queries = sum(
        t["query_count"]
        for t
        in timings["tests"].values())
    if total_queries:
        avg_query_time = total_time / total_queries
        debug_logger.debug(
            "TESTS AVERAGE query time: %s",
            avg_query_time)
    debug_logger.debug(
        "TESTS TOTAL test time: %s",
        total_time)
    debug_logger.debug(
        "TESTS TOTAL queries: %s",
        total_queries)
    debug_logger.debug("%s\n" % ("=" * 80))
tastypie_test.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def assertQueries(self, *prefixes):
        "Assert the correct queries are efficiently executed for a block."

        debug = connection.use_debug_cursor
        connection.use_debug_cursor = True

        count = len(connection.queries)
        yield

        if type(prefixes[0]) == int:
            assert prefixes[0] == len(connection.queries[count:])
        else:
            for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]):
                assert prefix and query and query['sql'].startswith(prefix), (prefix, query)

        connection.use_debug_cursor = debug
test_available_transitions.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_no_locks_query_count(self):
        """Check that query count to pull in available jobs hasn't changed

        If this test fails, consider changing the EXPECTED_QUERIES, or why
        it regressed.
        """

        EXPECTED_QUERIES = 0

        #  no jobs locking this object
        host_ct_key = ContentType.objects.get_for_model(
            self.host.downcast()).natural_key()
        host_id = self.host.id

        #  Loads up the caches
        js = JobScheduler()

        reset_queries()
        js.available_transitions([(host_ct_key, host_id, ), ])

        query_sum = len(connection.queries)
        self.assertEqual(query_sum, EXPECTED_QUERIES,
            "something changed with queries! "
            "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
decorators.py 文件源码 项目:djangolab 作者: DhiaTN 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def query_statistic(func):
    @wraps(func)
    def func_wrapper(*args, **kwargs):
        query_count = len(connection.queries)
        time = 0.0
        result = func(*args, **kwargs)
        queries_number = len(connection.queries) - query_count
        performed_query_list = connection.queries[-queries_number:]
        for query in performed_query_list:
            if query['sql'] == 'BEGIN':
                queries_number -= 1  # ignore begin transaction
            else:
                time += float(query['time'])
        message = "[Statistics] : {total} queries performed in {time}s."
        print(message.format(total=queries_number, time=time))
        return result
    return func_wrapper
middlewares.py 文件源码 项目:grical 作者: wikical 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def display_queries(request, stats, queries):
    """
    Generate a HttpResponse of SQL queries for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = request.REQUEST.get('sort_by', 'time')
    sort_buttons = RadioButtons('sort_by', sort,
                                (('order', 'by order'),
                                 ('time', 'time'),
                                 ('queries', 'query count')))
    output = render_queries(queries, sort)
    output.reset()
    output = [html.escape(unicode(line))
              for line in output.readlines()]
    response = HttpResponse(mimetype='text/html; charset=utf-8')
    response.content = (queries_template %
                        {'sort_buttons': sort_buttons,
                         'num_queries': len(queries),
                         'queries': "".join(output),
                         'rawqueries' : b64encode(cPickle.dumps(queries)),
                         'rawstats': b64encode(pickle_stats(stats)),
                         'url': request.path})
    return response
middleware.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def display_queries(request, stats, queries):
    """
    Generate a HttpResponse of SQL queries for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = request.REQUEST.get('sort_by', 'time')
    sort_buttons = RadioButtons('sort_by', sort, (
        ('order', 'by order'), ('time', 'time'), ('queries', 'query count')
    ))
    output = render_queries(queries, sort)
    output.reset()
    output = [html.escape(unicode(line)) for line in output.readlines()]
    response = HttpResponse(mimetype='text/html; charset=utf-8')
    response.content = (queries_template % {
        'sort_buttons': sort_buttons,
        'num_queries': len(queries),
        'queries': "".join(output),
        'rawqueries' : b64encode(cPickle.dumps(queries)),
        'rawstats': b64encode(pickle_stats(stats)),
        'url': request.path
    })
    return response
views.py 文件源码 项目:WebHashcat 作者: hegusung 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def csv_masks(request, hashfile_id):
    hashfile = get_object_or_404(Hashfile, id=hashfile_id)

    # didn't found the correct way in pure django...
    res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])

    fp = tempfile.SpooledTemporaryFile(mode='w')
    csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
    for item in res:
        csvfile.writerow([item.count, item.password_mask])
    fp.seek(0)   # rewind the file handle

    csvfile_data = fp.read()

    for query in connection.queries[-1:]:
        print(query["sql"])
        print(query["time"])

    response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
    response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
    return response
utils.py 文件源码 项目:montage 作者: storyful 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def log_sql_queries_to_console(path):
    """
        Logs SQL queries to terminal if in debug mode.
        We need to import connection at runtime as this is
        used in the wsgi handlers for the API endpoints and
        django settings are not available at import time there.
    """
    from django.db import connection
    if settings.DEBUG and len(connection.queries) > 0:
        total_time = 0
        output = "\033[1;31m[Request Started: %s]\033[0m\n" % (path)
        for query in connection.queries:
            total_time += float(query.get('time'))
            output = output + "\033[1;31m[%s]\033[0m \033[1m%s\033[0m\n" % (
                query.get('time'), " ".join(query['sql'].split()))
        output = output + "\033[1;31m[Request Finished: %s queries in %s seconds] \
        \033[0m" % (len(connection.queries), total_time)
        print output.encode('utf-8')
middleware.py 文件源码 项目:studentsdb2 作者: trivvet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __call__(self, request):

        # get number of db queries
        n = len(connection.queries)

        # time the view
        response = self.get_response(request)

        if settings.DEBUG == False and not n:
            return response
        # compute the db time for the queries just run
        db_queries = len(connection.queries) - n
        if db_queries:
            db_time = reduce(add, [float(q['time']) for q in connection.queries[n:]])
        else:
            db_time = 0.0

        if 'text/html' in response.get('Content-Type', ''):
            response.content = response.content.replace('<p id="response-time-db">', '<p class="bg-info">Database found took %s' % str(db_time))

        return response
tests.py 文件源码 项目:django_postgres_extensions 作者: primal100 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_traverse_GFK(self):
        """
        Test that we can traverse a 'content_object' with prefetch_related() and
        get to related objects on the other side (assuming it is suitably
        filtered)
        """
        TaggedItem.objects.create(tag="awesome", content_object=self.book1)
        TaggedItem.objects.create(tag="awesome", content_object=self.book2)
        TaggedItem.objects.create(tag="awesome", content_object=self.book3)
        TaggedItem.objects.create(tag="awesome", content_object=self.reader1)
        TaggedItem.objects.create(tag="awesome", content_object=self.reader2)

        ct = ContentType.objects.get_for_model(Book)

        # We get 3 queries - 1 for main query, 1 for content_objects since they
        # all use the same table, and 1 for the 'read_by' relation.
        with self.assertNumQueries(3):
            # If we limit to books, we know that they will have 'read_by'
            # attributes, so the following makes sense:
            qs = TaggedItem.objects.filter(content_type=ct, tag='awesome').prefetch_related('content_object__read_by')
            readers_of_awesome_books = {r.name for tag in qs
                                        for r in tag.content_object.read_by.all()}
            self.assertEqual(readers_of_awesome_books, {"me", "you", "someone"})
queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_sending_before_clearing_queries_log_signal():
    class SignalSendingBeforeClearingQueriesProxy(DelegatingProxy):
        def clear(self):
            before_clearing_queries_log.send(sender=None, queries=tuple(self))
            self.wrapped.clear()

    connection.queries_log = SignalSendingBeforeClearingQueriesProxy(
        connection.queries_log)
queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, queries, name):
        self.queries = queries
        super(QueryCountResult, self).__init__(
            name=name, value=self.number_of_queries)
queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def number_of_queries(self):
        return len(self.queries)
queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.queries = []
        self.nr_of_queries_when_entering = len(connection.queries)
        self.orig_force_debug_cursor = connection.force_debug_cursor
        connection.force_debug_cursor = True
        before_clearing_queries_log.connect(
            self.queries_about_to_be_reset_handler)
        return self
queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def queries_about_to_be_reset_handler(self,
                                          signal, sender, queries, **kwargs):
        self.store_queries()
        self.nr_of_queries_when_entering = 0
queries.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def store_queries(self):
        self.queries += connection.queries[self.nr_of_queries_when_entering:]
functions.py 文件源码 项目:WPS-4th 作者: Fastcampus-WPS 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def lsql():
    print(connection.queries[-1])
context_processors.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def debug(request):
    """
    Returns context variables helpful for debugging.
    """
    context_extras = {}
    if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS:
        context_extras['debug'] = True
        from django.db import connection
        # Return a lazy reference that computes connection.queries on access,
        # to ensure it contains queries triggered after this function runs.
        context_extras['sql_queries'] = lazy(lambda: connection.queries, list)
    return context_extras
test_sample.py 文件源码 项目:django-strict 作者: kyle-eshares 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_fetch(data):
    settings.DEBUG = True
    base_queries = len(connection.queries)

    book = Book.objects.first()
    assert len(connection.queries) - base_queries == 1
    author = book.fetch_author()
    assert len(connection.queries) - base_queries == 2
    assert author.id == book.author_id
test_sample.py 文件源码 项目:django-strict 作者: kyle-eshares 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_queryset_methods2(data):
    settings.DEBUG = True
    base_queries = len(connection.queries)

    book_qs = Book.objects.all()
    assert len(connection.queries) - base_queries == 0
    book_list = book_qs.to_list()
    assert len(connection.queries) - base_queries == 1
    book_qs.to_list()
    assert len(connection.queries) - base_queries == 2
middleware.py 文件源码 项目:blog 作者: tomming233 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_response(self, request, response):
        if settings.DEBUG:
            for query in connection.queries:
                print(
                    "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (query['time'], " ".join(query['sql'].split())))
        return response
debug.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def log_new_queries(queries, debug_logger=None):
    from django.db import connection

    debug_logger = debug_logger or logger
    new_queries = list(connection.queries[queries:])
    for query in new_queries:
        debug_logger.debug(query["time"])
        debug_logger.debug("\t%s", query["sql"])
debug.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def debug_sql(debug_logger=None):
    from django.conf import settings
    from django.db import connection

    debug = settings.DEBUG
    settings.DEBUG = True
    queries = len(connection.queries)
    try:
        yield
    finally:
        log_new_queries(
            queries,
            debug_logger)
        settings.DEBUG = debug
auth.py 文件源码 项目:ks-python-api 作者: Kriegspiel 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post(self, request):
        data = SignInSerializer().load_data(request.POST)
        user = get_user_model().objects.filter(email=data['email']).first()
        if user is None or not user.check_password(data['password']):
            raise AuthenticationError()
        token = AuthToken.objects.create(user_id=user.id)
        print(connection.queries)
        return ApiResponse({'token': token.value})
test_query_scaling.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _measure_scaling(self, create_n, measured_resource, scaled_resource = None):
        """

        :param create_n: Function to create N of scaled_resource
        :param measured_resource: The resource we will measure the query load for
        :param scaled_resource: The object which is actually being scaled with N
        :return: Instance of Order1, OrderN, OrderBad
        """
        if scaled_resource is None:
            scaled_resource = measured_resource

        query_counts = {}
        samples = [5, 6, 7, 8]
        for n in samples:
            ObjectCache.clear()
            create_n(n)
            # Queries get reset at the start of a request
            self.assertEqual(scaled_resource._meta.queryset.count(), n)
            response = self.api_client.get("/api/%s/" % measured_resource._meta.resource_name, data = {'limit': 0})
            self.assertEqual(response.status_code, 200, "%s:%s" % (response.content, measured_resource._meta.resource_name))
            query_count = len(connection.queries)

            self.assertEqual(len(self.deserialize(response)['objects']), measured_resource._meta.queryset.count())
            query_counts[n] = query_count

        # Ignore samples[0], it was just to clear out any setup overhead from first call to API

        # gradient between samples[1] and samples[2]
        grad1 = (query_counts[samples[2]] - query_counts[samples[1]]) / (samples[2] - samples[1])
        # gradient between samples[2] and samples[3]
        grad2 = (query_counts[samples[3]] - query_counts[samples[2]]) / (samples[3] - samples[2])

        if grad1 == 0 and grad2 == 0:
            # Hoorah, O(1)
            return Order1(query_counts[samples[3]])
        elif grad1 > 0 and grad1 == grad2:
            # O(N)
            return OrderN(grad1)
        else:
            # Worse than O(N)
            return OrderBad()
test_host.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_cached_hosts(self):
        instance = HostListMixin()
        instance.host_ids = json.dumps([self.hosts[1].id])
        self.assertListEqual(instance.hosts, [self.hosts[1]])

        db_hits = len(connection.queries)
        self.assertListEqual(instance.hosts, [self.hosts[1]])
        self.assertEqual(db_hits, len(connection.queries))
test_host.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_changing_hosts(self):
        instance = HostListMixin()
        instance.host_ids = json.dumps([self.hosts[1].id])
        self.assertListEqual(instance.hosts, [self.hosts[1]])

        db_hits = len(connection.queries)
        instance.host_ids = json.dumps([self.hosts[0].id])
        self.assertListEqual(instance.hosts, [self.hosts[0]])
        self.assertNotEqual(db_hits, len(connection.queries))


问题


面经


文章

微信
公众号

扫码关注公众号