python类queries()的实例源码

test_samples.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def assertQueries(*prefixes):
    "Assert the correct queries are efficiently executed for a block."
    count = len(connection.queries)
    yield
    for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]):
        assert prefix and query and query['sql'].startswith(prefix), (prefix, query)
        cursor = connection.cursor()
        cursor.execute('EXPLAIN ' + query['sql'])
        plan = ''.join(row for row, in cursor)
        assert prefix == 'INSERT' or 'Index Scan' in plan, (plan, query)
test_available_jobs.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_locks_query_count(self):
        """Check that query count to pull in available jobs hasn't changed"""

        EXPECTED_QUERIES = 6  # but 3 are for setup

        host_ct_key = ContentType.objects.get_for_model(
            self.host.downcast()).natural_key()
        host_id = self.host.id

        #  create 200 host ups and down jobs in 'pending' default state
        #  key point is they are not in the 'complete' state.
        for job_num in xrange(200):
            if job_num % 2 == 0:
                RebootHostJob.objects.create(host=self.host)
            else:
                ShutdownHostJob.objects.create(host=self.host)

        #  Loads up the caches, including the _lock_cache while should find
        #  these jobs.
        js = JobScheduler()

        reset_queries()

        #  Getting jobs here may incur a higher cost.
        js.available_jobs([(host_ct_key, host_id), ])

        query_sum = len(connection.queries)
        self.assertGreaterEqual(query_sum, EXPECTED_QUERIES,
            "something changed with queries! "
            "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
test_available_transitions.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_locks_query_count(self):
        """Check that query count to pull in available jobs hasn't changed"""

        EXPECTED_QUERIES = 0

        #  object to be locked by jobs
        host_ct_key = ContentType.objects.get_for_model(
            self.host.downcast()).natural_key()
        host_id = self.host.id

        #  create 200 host ups and down jobs in 'pending' default state
        #  key point is they are not in the 'complete' state.
        for job_num in xrange(200):
            if job_num % 2 == 0:
                RebootHostJob.objects.create(host=self.host)
            else:
                ShutdownHostJob.objects.create(host=self.host)

        #  Loads up the caches, including the _lock_cache while should find
        #  these jobs.
        js = JobScheduler()

        reset_queries()

        #  Getting jobs here may incur a higher cost.
        js.available_jobs([(host_ct_key, host_id), ])

        query_sum = len(connection.queries)
        self.assertEqual(query_sum, EXPECTED_QUERIES,
            "something changed with queries! "
            "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
views.py 文件源码 项目:beg-django-e-commerce 作者: Apress 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def show_category(request, category_slug, template_name="catalog/category.html"):
    """ view for each individual category page """
    category_cache_key = request.path
    c = cache.get(category_cache_key)
    if not c:
        c = get_object_or_404(Category.active, slug=category_slug)
        cache.set(category_cache_key, c, CACHE_TIMEOUT)
    products = c.product_set.filter(is_active=True)
    page_title = c.name
    meta_keywords = c.meta_keywords
    meta_description = c.meta_description

    from django.db import connection
    queries = connection.queries
    return render_to_response(template_name, locals(), context_instance=RequestContext(request))
middleware.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def process_response(self, request, response):
        from sys import stdout
        if stdout.isatty():
            for query in connection.queries:
                print "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (
                    query['time'], " ".join(query['sql'].split()))

        return response
middlewares.py 文件源码 项目:grical 作者: wikical 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def render_queries(queries, sort):
    """
    Returns a StringIO containing the formatted SQL queries.

    _sort_ is a field to sort by.
    """
    output = StringIO()
    if sort == 'order':
        print >>output, "     time query"
        for query in queries:
            print >>output, " %8s %s" % (query["time"], query["sql"])
        return output

    def sorter(x, y):
        if sort == 'time':
            return cmp(x[1][1], y[1][1])
        elif sort == 'queries':
            return cmp(x[1][0], y[1][0])
        else:
            raise RuntimeError("Unknown sort: %s" % sort)

    print >>output, "  queries     time query"
    results = {}
    for query in queries:
        try:
            result = results[query["sql"]]
            result[0] += 1
            result[1] += Decimal(query["time"])
        except KeyError:
            results[query["sql"]] = [1, Decimal(query["time"])]
    results = sorted(results.iteritems(), cmp=sorter, reverse=True)
    for result in results:
        print >>output, " %8d %8.3f %s" % (result[1][0],
                                           result[1][1],
                                           result[0])
    return output
middlewares.py 文件源码 项目:grical 作者: wikical 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def display_stats(request, stats, queries):
    """
    Generate a HttpResponse of functions for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = [request.REQUEST.get('sort_first', 'time'),
            request.REQUEST.get('sort_second', 'calls')]
    format = request.REQUEST.get('format', 'print_stats')
    sort_first_buttons = RadioButtons('sort_first', sort[0],
                                      sort_categories)
    sort_second_buttons = RadioButtons('sort_second', sort[1],
                                       sort_categories)
    format_buttons = RadioButtons('format', format,
                                  (('print_stats', 'by function'),
                                   ('print_callers', 'by callers'),
                                   ('print_callees', 'by callees')))
    output = render_stats(stats, sort, format)
    output.reset()
    output = [html.escape(unicode(line)) for line in output.readlines()]
    response = HttpResponse(mimetype='text/html; charset=utf-8')
    response.content = (stats_template %
                        {'format_buttons': format_buttons,
                         'sort_first_buttons': sort_first_buttons,
                         'sort_second_buttons': sort_second_buttons,
                         'rawqueries' : b64encode(cPickle.dumps(queries)),
                         'rawstats': b64encode(pickle_stats(stats)),
                         'stats': "".join(output),
                         'url': request.path})
    return response
middlewares.py 文件源码 项目:grical 作者: wikical 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process_request(self, request):
        """
    Setup the profiler for a profiling run and clear the SQL query log.

    If this is a resort of an existing profiling run, just return
    the resorted list.
    """
        def unpickle(params):
            stats = unpickle_stats(b64decode(params.get('stats', '')))
            queries = cPickle.loads(b64decode(params.get('queries', '')))
            return stats, queries

        if request.method != 'GET' and \
           not (request.META.get('HTTP_CONTENT_TYPE',
                                 request.META.get('CONTENT_TYPE', '')) in
                ['multipart/form-data', 'application/x-www-form-urlencoded']):
            return
        if (request.REQUEST.get('profile', False) and
            (settings.DEBUG == True or request.user.is_staff)):
            request.statsfile = tempfile.NamedTemporaryFile()
            params = request.REQUEST
            if (params.get('show_stats', False)
                and params.get('show_queries', '1') == '1'):
                # Instantly re-sort the existing stats data
                stats, queries = unpickle(params)
                return display_stats(request, stats, queries)
            elif (params.get('show_queries', False)
                  and params.get('show_stats', '1') == '1'):
                stats, queries = unpickle(params)
                return display_queries(request, stats, queries)
            else:
                # We don't have previous data, so initialize the profiler
                request.profiler = hotshot.Profile(request.statsfile.name)
                reset_queries()
middlewares.py 文件源码 项目:grical 作者: wikical 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process_response(self, request, response):
        """Finish profiling and render the results."""
        profiler = getattr(request, 'profiler', None)
        if profiler:
            profiler.close()
            params = request.REQUEST
            stats = hotshot.stats.load(request.statsfile.name)
            queries = connection.queries
            if (params.get('show_queries', False)
                and params.get('show_stats', '1') == '1'):
                response = display_queries(request, stats, queries)
            else:
                response = display_stats(request, stats, queries)
        return response
middleware.py 文件源码 项目:studentsdb 作者: PyDev777 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def process_response(self, request, response):
        if settings.DEBUG:
            self.db_qcount = len(connection.queries)
            self.db_time += sum([float(q['time']) for q in connection.queries])
            if 'text/html' in response.get('Content-Type', ''):
                soup = BeautifulSoup(response.content)
                if soup.body:
                    tag = soup.new_tag('code', style='position: fixed; top: 0; left: 0px')
                    tag.string = 'DB took: %s, DB queries count: %s' % (str(self.db_time), str(self.db_qcount))
                    soup.body.insert(0, tag)
                    response.content = soup.prettify()
        return response
middleware.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def render_queries(queries, sort):
    """
    Returns a StringIO containing the formatted SQL queries.

    _sort_ is a field to sort by.
    """
    output = StringIO()
    if sort == 'order':
        print >>output, "     time query"
        for query in queries:
            print >>output, " %8s %s" % (query["time"], query["sql"])
        return output
    if sort == 'time':
        def sorter(x, y):
            return cmp(x[1][1], y[1][1])
    elif sort == 'queries':
        def sorter(x, y):
            return cmp(x[1][0], y[1][0])
    else:
        raise RuntimeError("Unknown sort: %s" % sort)
    print >>output, "  queries     time query"
    results = {}
    for query in queries:
        try:
            result = results[query["sql"]]
            result[0] += 1
            result[1] += Decimal(query["time"])
        except KeyError:
            results[query["sql"]] = [1, Decimal(query["time"])]
    results = sorted(results.iteritems(), cmp=sorter, reverse=True)
    for result in results:
        print >>output, " %8d %8.3f %s" % (
            result[1][0], result[1][1], result[0]
        )
    return output
middleware.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def display_stats(request, stats, queries):
    """
    Generate a HttpResponse of functions for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = [
        request.REQUEST.get('sort_first', 'time'),
        request.REQUEST.get('sort_second', 'calls')
    ]
    fmt = request.REQUEST.get('format', 'print_stats')
    sort_first_buttons = RadioButtons('sort_first', sort[0], sort_categories)
    sort_second_buttons = RadioButtons('sort_second', sort[1], sort_categories)
    format_buttons = RadioButtons('format', fmt, (
        ('print_stats', 'by function'),
        ('print_callers', 'by callers'),
        ('print_callees', 'by callees')
    ))
    output = render_stats(stats, sort, fmt)
    output.reset()
    output = [html.escape(unicode(line)) for line in output.readlines()]
    response = HttpResponse(content_type='text/html; charset=utf-8')
    response.content = (stats_template % {
        'format_buttons': format_buttons,
        'sort_first_buttons': sort_first_buttons,
        'sort_second_buttons': sort_second_buttons,
        'rawqueries' : b64encode(cPickle.dumps(queries)),
        'rawstats': b64encode(pickle_stats(stats)),
        'stats': "".join(output),
        'url': request.path
    })
    return response
middleware.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_request(self, request):
        """
        Setup the profiler for a profiling run and clear the SQL query log.

        If this is a resort of an existing profiling run, just return
        the resorted list.
        """
        def unpickle(params):
            stats = unpickle_stats(b64decode(params.get('stats', '')))
            queries = cPickle.loads(b64decode(params.get('queries', '')))
            return stats, queries

        if request.method != 'GET' and \
           not (request.META.get(
               'HTTP_CONTENT_TYPE', request.META.get('CONTENT_TYPE', '')
           ) in ['multipart/form-data', 'application/x-www-form-urlencoded']):
            return
        if (request.REQUEST.get('profile', False) and
            (settings.DEBUG == True or request.user.is_staff)):
            request.statsfile = tempfile.NamedTemporaryFile()
            params = request.REQUEST
            if (params.get('show_stats', False)
                and params.get('show_queries', '1') == '1'):
                # Instantly re-sort the existing stats data
                stats, queries = unpickle(params)
                return display_stats(request, stats, queries)
            elif (params.get('show_queries', False)
                  and params.get('show_stats', '1') == '1'):
                stats, queries = unpickle(params)
                return display_queries(request, stats, queries)
            else:
                # We don't have previous data, so initialize the profiler
                request.profiler = hotshot.Profile(request.statsfile.name)
                reset_queries()
middleware.py 文件源码 项目:django-icekit 作者: ic-labs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def process_response(self, request, response):
        """Finish profiling and render the results."""
        profiler = getattr(request, 'profiler', None)
        if profiler:
            profiler.close()
            params = request.REQUEST
            stats = hotshot.stats.load(request.statsfile.name)
            queries = connection.queries
            if (params.get('show_queries', False)
                and params.get('show_stats', '1') == '1'):
                response = display_queries(request, stats, queries)
            else:
                response = display_stats(request, stats, queries)
        return response
views.py 文件源码 项目:WebHashcat 作者: hegusung 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def api_hashfile_top_password(request, hashfile_id, N):
    if request.method == "POST":
        params = request.POST
    else:
        params = request.GET

    hashfile = get_object_or_404(Hashfile, id=hashfile_id)

    pass_count_list = Cracked.objects.raw("SELECT id, password, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY BINARY password ORDER BY count DESC LIMIT 10", [hashfile.id])

    top_password_list = []
    count_list = []
    for item in pass_count_list:
        top_password_list.append(item.password)
        count_list.append(item.count)

    res = {
        "top_password_list": top_password_list,
        "count_list": count_list,
    }

    for query in connection.queries[-1:]:
        print(query["sql"])
        print(query["time"])

    return HttpResponse(json.dumps(res), content_type="application/json")
middleware.py 文件源码 项目:jiango 作者: yefei 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def process_response(self, request, response):
        if connection.queries:
            sys.stdout.write("SQL %s\n" % ('=' * 26))
            for query in connection.queries:
                sys.stdout.write("[%s] %s\n" % (query['time'], query['sql']))
            sys.stdout.write("%s\n" % ('=' * 30))
        return response
utils.py 文件源码 项目:montage 作者: storyful 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def executemany(self, sql, param_list):
        """
            Outputs a batch of SQL queries to an appstats trace
        """
        self.start_appstats_recording()
        try:
            return super(CursorDebugWrapper, self).executemany(sql, param_list)
        finally:
            try:
                times = len(param_list)
            except TypeError:           # param_list could be an iterator
                times = '?'

            sql = '{0} times: {1}'.format(times, sql)
            self.end_appstats_recording(sql)
context_processors.py 文件源码 项目:djanoDoc 作者: JustinChavez 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def debug(request):
    """
    Returns context variables helpful for debugging.
    """
    context_extras = {}
    if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS:
        context_extras['debug'] = True
        from django.db import connection
        # Return a lazy reference that computes connection.queries on access,
        # to ensure it contains queries triggered after this function runs.
        context_extras['sql_queries'] = lazy(lambda: connection.queries, list)
    return context_extras
context_processors.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def debug(_request):
    """Returns context variables helpful for debugging.

    Same as django.core.context_processors.debug, just without the check
    against INTERNAL_IPS."""
    context_extras = {}
    if settings.DEBUG:
        context_extras['debug'] = True
        from django.db import connection
        context_extras['sql_queries'] = connection.queries
    return context_extras
debug.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def log_last_django_query(logger):
    """Debug logs the latest SQL query made by Django.

    Will only work if the DEBUG=True in the Django settings.

    :param logger: The logging.Logger object to use for logging.
    """
    from nav.models import manage as _manage
    from django.db import connection
    if connection.queries:
        logger.debug("Last Django SQL query was: %s",
                     connection.queries[-1]['sql'])


问题


面经


文章

微信
公众号

扫码关注公众号