def assertQueries(*prefixes):
"Assert the correct queries are efficiently executed for a block."
count = len(connection.queries)
yield
for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]):
assert prefix and query and query['sql'].startswith(prefix), (prefix, query)
cursor = connection.cursor()
cursor.execute('EXPLAIN ' + query['sql'])
plan = ''.join(row for row, in cursor)
assert prefix == 'INSERT' or 'Index Scan' in plan, (plan, query)
python类queries()的实例源码
test_available_jobs.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_locks_query_count(self):
"""Check that query count to pull in available jobs hasn't changed"""
EXPECTED_QUERIES = 6 # but 3 are for setup
host_ct_key = ContentType.objects.get_for_model(
self.host.downcast()).natural_key()
host_id = self.host.id
# create 200 host ups and down jobs in 'pending' default state
# key point is they are not in the 'complete' state.
for job_num in xrange(200):
if job_num % 2 == 0:
RebootHostJob.objects.create(host=self.host)
else:
ShutdownHostJob.objects.create(host=self.host)
# Loads up the caches, including the _lock_cache while should find
# these jobs.
js = JobScheduler()
reset_queries()
# Getting jobs here may incur a higher cost.
js.available_jobs([(host_ct_key, host_id), ])
query_sum = len(connection.queries)
self.assertGreaterEqual(query_sum, EXPECTED_QUERIES,
"something changed with queries! "
"got %s expected %s" % (query_sum, EXPECTED_QUERIES))
test_available_transitions.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_locks_query_count(self):
"""Check that query count to pull in available jobs hasn't changed"""
EXPECTED_QUERIES = 0
# object to be locked by jobs
host_ct_key = ContentType.objects.get_for_model(
self.host.downcast()).natural_key()
host_id = self.host.id
# create 200 host ups and down jobs in 'pending' default state
# key point is they are not in the 'complete' state.
for job_num in xrange(200):
if job_num % 2 == 0:
RebootHostJob.objects.create(host=self.host)
else:
ShutdownHostJob.objects.create(host=self.host)
# Loads up the caches, including the _lock_cache while should find
# these jobs.
js = JobScheduler()
reset_queries()
# Getting jobs here may incur a higher cost.
js.available_jobs([(host_ct_key, host_id), ])
query_sum = len(connection.queries)
self.assertEqual(query_sum, EXPECTED_QUERIES,
"something changed with queries! "
"got %s expected %s" % (query_sum, EXPECTED_QUERIES))
def show_category(request, category_slug, template_name="catalog/category.html"):
""" view for each individual category page """
category_cache_key = request.path
c = cache.get(category_cache_key)
if not c:
c = get_object_or_404(Category.active, slug=category_slug)
cache.set(category_cache_key, c, CACHE_TIMEOUT)
products = c.product_set.filter(is_active=True)
page_title = c.name
meta_keywords = c.meta_keywords
meta_description = c.meta_description
from django.db import connection
queries = connection.queries
return render_to_response(template_name, locals(), context_instance=RequestContext(request))
def process_response(self, request, response):
from sys import stdout
if stdout.isatty():
for query in connection.queries:
print "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (
query['time'], " ".join(query['sql'].split()))
return response
def render_queries(queries, sort):
"""
Returns a StringIO containing the formatted SQL queries.
_sort_ is a field to sort by.
"""
output = StringIO()
if sort == 'order':
print >>output, " time query"
for query in queries:
print >>output, " %8s %s" % (query["time"], query["sql"])
return output
def sorter(x, y):
if sort == 'time':
return cmp(x[1][1], y[1][1])
elif sort == 'queries':
return cmp(x[1][0], y[1][0])
else:
raise RuntimeError("Unknown sort: %s" % sort)
print >>output, " queries time query"
results = {}
for query in queries:
try:
result = results[query["sql"]]
result[0] += 1
result[1] += Decimal(query["time"])
except KeyError:
results[query["sql"]] = [1, Decimal(query["time"])]
results = sorted(results.iteritems(), cmp=sorter, reverse=True)
for result in results:
print >>output, " %8d %8.3f %s" % (result[1][0],
result[1][1],
result[0])
return output
def display_stats(request, stats, queries):
"""
Generate a HttpResponse of functions for a profiling run.
_stats_ should contain a pstats.Stats of a hotshot session.
_queries_ should contain a list of SQL queries.
"""
sort = [request.REQUEST.get('sort_first', 'time'),
request.REQUEST.get('sort_second', 'calls')]
format = request.REQUEST.get('format', 'print_stats')
sort_first_buttons = RadioButtons('sort_first', sort[0],
sort_categories)
sort_second_buttons = RadioButtons('sort_second', sort[1],
sort_categories)
format_buttons = RadioButtons('format', format,
(('print_stats', 'by function'),
('print_callers', 'by callers'),
('print_callees', 'by callees')))
output = render_stats(stats, sort, format)
output.reset()
output = [html.escape(unicode(line)) for line in output.readlines()]
response = HttpResponse(mimetype='text/html; charset=utf-8')
response.content = (stats_template %
{'format_buttons': format_buttons,
'sort_first_buttons': sort_first_buttons,
'sort_second_buttons': sort_second_buttons,
'rawqueries' : b64encode(cPickle.dumps(queries)),
'rawstats': b64encode(pickle_stats(stats)),
'stats': "".join(output),
'url': request.path})
return response
def process_request(self, request):
"""
Setup the profiler for a profiling run and clear the SQL query log.
If this is a resort of an existing profiling run, just return
the resorted list.
"""
def unpickle(params):
stats = unpickle_stats(b64decode(params.get('stats', '')))
queries = cPickle.loads(b64decode(params.get('queries', '')))
return stats, queries
if request.method != 'GET' and \
not (request.META.get('HTTP_CONTENT_TYPE',
request.META.get('CONTENT_TYPE', '')) in
['multipart/form-data', 'application/x-www-form-urlencoded']):
return
if (request.REQUEST.get('profile', False) and
(settings.DEBUG == True or request.user.is_staff)):
request.statsfile = tempfile.NamedTemporaryFile()
params = request.REQUEST
if (params.get('show_stats', False)
and params.get('show_queries', '1') == '1'):
# Instantly re-sort the existing stats data
stats, queries = unpickle(params)
return display_stats(request, stats, queries)
elif (params.get('show_queries', False)
and params.get('show_stats', '1') == '1'):
stats, queries = unpickle(params)
return display_queries(request, stats, queries)
else:
# We don't have previous data, so initialize the profiler
request.profiler = hotshot.Profile(request.statsfile.name)
reset_queries()
def process_response(self, request, response):
"""Finish profiling and render the results."""
profiler = getattr(request, 'profiler', None)
if profiler:
profiler.close()
params = request.REQUEST
stats = hotshot.stats.load(request.statsfile.name)
queries = connection.queries
if (params.get('show_queries', False)
and params.get('show_stats', '1') == '1'):
response = display_queries(request, stats, queries)
else:
response = display_stats(request, stats, queries)
return response
def process_response(self, request, response):
if settings.DEBUG:
self.db_qcount = len(connection.queries)
self.db_time += sum([float(q['time']) for q in connection.queries])
if 'text/html' in response.get('Content-Type', ''):
soup = BeautifulSoup(response.content)
if soup.body:
tag = soup.new_tag('code', style='position: fixed; top: 0; left: 0px')
tag.string = 'DB took: %s, DB queries count: %s' % (str(self.db_time), str(self.db_qcount))
soup.body.insert(0, tag)
response.content = soup.prettify()
return response
def render_queries(queries, sort):
"""
Returns a StringIO containing the formatted SQL queries.
_sort_ is a field to sort by.
"""
output = StringIO()
if sort == 'order':
print >>output, " time query"
for query in queries:
print >>output, " %8s %s" % (query["time"], query["sql"])
return output
if sort == 'time':
def sorter(x, y):
return cmp(x[1][1], y[1][1])
elif sort == 'queries':
def sorter(x, y):
return cmp(x[1][0], y[1][0])
else:
raise RuntimeError("Unknown sort: %s" % sort)
print >>output, " queries time query"
results = {}
for query in queries:
try:
result = results[query["sql"]]
result[0] += 1
result[1] += Decimal(query["time"])
except KeyError:
results[query["sql"]] = [1, Decimal(query["time"])]
results = sorted(results.iteritems(), cmp=sorter, reverse=True)
for result in results:
print >>output, " %8d %8.3f %s" % (
result[1][0], result[1][1], result[0]
)
return output
def display_stats(request, stats, queries):
"""
Generate a HttpResponse of functions for a profiling run.
_stats_ should contain a pstats.Stats of a hotshot session.
_queries_ should contain a list of SQL queries.
"""
sort = [
request.REQUEST.get('sort_first', 'time'),
request.REQUEST.get('sort_second', 'calls')
]
fmt = request.REQUEST.get('format', 'print_stats')
sort_first_buttons = RadioButtons('sort_first', sort[0], sort_categories)
sort_second_buttons = RadioButtons('sort_second', sort[1], sort_categories)
format_buttons = RadioButtons('format', fmt, (
('print_stats', 'by function'),
('print_callers', 'by callers'),
('print_callees', 'by callees')
))
output = render_stats(stats, sort, fmt)
output.reset()
output = [html.escape(unicode(line)) for line in output.readlines()]
response = HttpResponse(content_type='text/html; charset=utf-8')
response.content = (stats_template % {
'format_buttons': format_buttons,
'sort_first_buttons': sort_first_buttons,
'sort_second_buttons': sort_second_buttons,
'rawqueries' : b64encode(cPickle.dumps(queries)),
'rawstats': b64encode(pickle_stats(stats)),
'stats': "".join(output),
'url': request.path
})
return response
def process_request(self, request):
"""
Setup the profiler for a profiling run and clear the SQL query log.
If this is a resort of an existing profiling run, just return
the resorted list.
"""
def unpickle(params):
stats = unpickle_stats(b64decode(params.get('stats', '')))
queries = cPickle.loads(b64decode(params.get('queries', '')))
return stats, queries
if request.method != 'GET' and \
not (request.META.get(
'HTTP_CONTENT_TYPE', request.META.get('CONTENT_TYPE', '')
) in ['multipart/form-data', 'application/x-www-form-urlencoded']):
return
if (request.REQUEST.get('profile', False) and
(settings.DEBUG == True or request.user.is_staff)):
request.statsfile = tempfile.NamedTemporaryFile()
params = request.REQUEST
if (params.get('show_stats', False)
and params.get('show_queries', '1') == '1'):
# Instantly re-sort the existing stats data
stats, queries = unpickle(params)
return display_stats(request, stats, queries)
elif (params.get('show_queries', False)
and params.get('show_stats', '1') == '1'):
stats, queries = unpickle(params)
return display_queries(request, stats, queries)
else:
# We don't have previous data, so initialize the profiler
request.profiler = hotshot.Profile(request.statsfile.name)
reset_queries()
def process_response(self, request, response):
"""Finish profiling and render the results."""
profiler = getattr(request, 'profiler', None)
if profiler:
profiler.close()
params = request.REQUEST
stats = hotshot.stats.load(request.statsfile.name)
queries = connection.queries
if (params.get('show_queries', False)
and params.get('show_stats', '1') == '1'):
response = display_queries(request, stats, queries)
else:
response = display_stats(request, stats, queries)
return response
def api_hashfile_top_password(request, hashfile_id, N):
if request.method == "POST":
params = request.POST
else:
params = request.GET
hashfile = get_object_or_404(Hashfile, id=hashfile_id)
pass_count_list = Cracked.objects.raw("SELECT id, password, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY BINARY password ORDER BY count DESC LIMIT 10", [hashfile.id])
top_password_list = []
count_list = []
for item in pass_count_list:
top_password_list.append(item.password)
count_list.append(item.count)
res = {
"top_password_list": top_password_list,
"count_list": count_list,
}
for query in connection.queries[-1:]:
print(query["sql"])
print(query["time"])
return HttpResponse(json.dumps(res), content_type="application/json")
def process_response(self, request, response):
if connection.queries:
sys.stdout.write("SQL %s\n" % ('=' * 26))
for query in connection.queries:
sys.stdout.write("[%s] %s\n" % (query['time'], query['sql']))
sys.stdout.write("%s\n" % ('=' * 30))
return response
def executemany(self, sql, param_list):
"""
Outputs a batch of SQL queries to an appstats trace
"""
self.start_appstats_recording()
try:
return super(CursorDebugWrapper, self).executemany(sql, param_list)
finally:
try:
times = len(param_list)
except TypeError: # param_list could be an iterator
times = '?'
sql = '{0} times: {1}'.format(times, sql)
self.end_appstats_recording(sql)
def debug(request):
"""
Returns context variables helpful for debugging.
"""
context_extras = {}
if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS:
context_extras['debug'] = True
from django.db import connection
# Return a lazy reference that computes connection.queries on access,
# to ensure it contains queries triggered after this function runs.
context_extras['sql_queries'] = lazy(lambda: connection.queries, list)
return context_extras
def debug(_request):
"""Returns context variables helpful for debugging.
Same as django.core.context_processors.debug, just without the check
against INTERNAL_IPS."""
context_extras = {}
if settings.DEBUG:
context_extras['debug'] = True
from django.db import connection
context_extras['sql_queries'] = connection.queries
return context_extras
def log_last_django_query(logger):
"""Debug logs the latest SQL query made by Django.
Will only work if the DEBUG=True in the Django settings.
:param logger: The logging.Logger object to use for logging.
"""
from nav.models import manage as _manage
from django.db import connection
if connection.queries:
logger.debug("Last Django SQL query was: %s",
connection.queries[-1]['sql'])