def get_sample_results(cls):
read = """QUERY - 'SELECT "auth_user"."id" FROM "auth_group"'""" \
"""- PARAMS = ()"""
write = """QUERY - 'UPDATE "auth_group" SET "name" = %s'""" \
"""- PARAMS = ('bar',)"""
other = """QUERY - 'BEGIN TRANSACTION' - PARAMS = ()"""
def to_query(sql):
return {'sql': sql, 'time': '%.3f' % random.random()}
def to_single_result(*sqls):
qc = cls()
qc.queries = [to_query(sql) for sql in sqls]
return qc.get_results_to_send()
return [
to_single_result(*sqls)
for sqls in [
[read], [read, write], [read, read], [write, write],
[other, other], [read, write, other]
]
]
python类queries()的实例源码
def test_debug_sql_logger(caplog, settings):
from pootle_project.models import Project
from django.db import connection
settings.DEBUG = True
queries = len(connection.queries)
log_new_queries(queries)
assert caplog.records == []
# trigger some sql and log
Project.objects.count()
log_new_queries(queries)
timing = caplog.records[0].message
sql = caplog.records[1].message
# match the timing, sql
assert re.match("^\d+?\.\d+?$", timing)
assert "SELECT COUNT" in sql
assert "pootle_app_project" in sql
def log_test_timing(debug_logger, timings, name, start):
from django.db import connection
time_taken = time.time() - start
timings["tests"][name] = dict(
slow_queries=[
q for q
in connection.queries
if float(q["time"]) > 0],
query_count=len(connection.queries),
timing=time_taken)
debug_logger.debug(
"{: <70} {: <10} {: <10}".format(
*(name,
round(time_taken, 4),
len(connection.queries))))
def log_test_report(debug_logger, timings):
debug_logger.debug(
"%s\nTESTS END: %s",
"=" * 80,
datetime.now())
total_time = time.time() - timings["start"]
total_queries = sum(
t["query_count"]
for t
in timings["tests"].values())
if total_queries:
avg_query_time = total_time / total_queries
debug_logger.debug(
"TESTS AVERAGE query time: %s",
avg_query_time)
debug_logger.debug(
"TESTS TOTAL test time: %s",
total_time)
debug_logger.debug(
"TESTS TOTAL queries: %s",
total_queries)
debug_logger.debug("%s\n" % ("=" * 80))
def assertQueries(self, *prefixes):
"Assert the correct queries are efficiently executed for a block."
debug = connection.use_debug_cursor
connection.use_debug_cursor = True
count = len(connection.queries)
yield
if type(prefixes[0]) == int:
assert prefixes[0] == len(connection.queries[count:])
else:
for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]):
assert prefix and query and query['sql'].startswith(prefix), (prefix, query)
connection.use_debug_cursor = debug
test_available_transitions.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_no_locks_query_count(self):
"""Check that query count to pull in available jobs hasn't changed
If this test fails, consider changing the EXPECTED_QUERIES, or why
it regressed.
"""
EXPECTED_QUERIES = 0
# no jobs locking this object
host_ct_key = ContentType.objects.get_for_model(
self.host.downcast()).natural_key()
host_id = self.host.id
# Loads up the caches
js = JobScheduler()
reset_queries()
js.available_transitions([(host_ct_key, host_id, ), ])
query_sum = len(connection.queries)
self.assertEqual(query_sum, EXPECTED_QUERIES,
"something changed with queries! "
"got %s expected %s" % (query_sum, EXPECTED_QUERIES))
def query_statistic(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
query_count = len(connection.queries)
time = 0.0
result = func(*args, **kwargs)
queries_number = len(connection.queries) - query_count
performed_query_list = connection.queries[-queries_number:]
for query in performed_query_list:
if query['sql'] == 'BEGIN':
queries_number -= 1 # ignore begin transaction
else:
time += float(query['time'])
message = "[Statistics] : {total} queries performed in {time}s."
print(message.format(total=queries_number, time=time))
return result
return func_wrapper
def display_queries(request, stats, queries):
"""
Generate a HttpResponse of SQL queries for a profiling run.
_stats_ should contain a pstats.Stats of a hotshot session.
_queries_ should contain a list of SQL queries.
"""
sort = request.REQUEST.get('sort_by', 'time')
sort_buttons = RadioButtons('sort_by', sort,
(('order', 'by order'),
('time', 'time'),
('queries', 'query count')))
output = render_queries(queries, sort)
output.reset()
output = [html.escape(unicode(line))
for line in output.readlines()]
response = HttpResponse(mimetype='text/html; charset=utf-8')
response.content = (queries_template %
{'sort_buttons': sort_buttons,
'num_queries': len(queries),
'queries': "".join(output),
'rawqueries' : b64encode(cPickle.dumps(queries)),
'rawstats': b64encode(pickle_stats(stats)),
'url': request.path})
return response
def display_queries(request, stats, queries):
"""
Generate a HttpResponse of SQL queries for a profiling run.
_stats_ should contain a pstats.Stats of a hotshot session.
_queries_ should contain a list of SQL queries.
"""
sort = request.REQUEST.get('sort_by', 'time')
sort_buttons = RadioButtons('sort_by', sort, (
('order', 'by order'), ('time', 'time'), ('queries', 'query count')
))
output = render_queries(queries, sort)
output.reset()
output = [html.escape(unicode(line)) for line in output.readlines()]
response = HttpResponse(mimetype='text/html; charset=utf-8')
response.content = (queries_template % {
'sort_buttons': sort_buttons,
'num_queries': len(queries),
'queries': "".join(output),
'rawqueries' : b64encode(cPickle.dumps(queries)),
'rawstats': b64encode(pickle_stats(stats)),
'url': request.path
})
return response
def csv_masks(request, hashfile_id):
hashfile = get_object_or_404(Hashfile, id=hashfile_id)
# didn't found the correct way in pure django...
res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])
fp = tempfile.SpooledTemporaryFile(mode='w')
csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
for item in res:
csvfile.writerow([item.count, item.password_mask])
fp.seek(0) # rewind the file handle
csvfile_data = fp.read()
for query in connection.queries[-1:]:
print(query["sql"])
print(query["time"])
response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
return response
def log_sql_queries_to_console(path):
"""
Logs SQL queries to terminal if in debug mode.
We need to import connection at runtime as this is
used in the wsgi handlers for the API endpoints and
django settings are not available at import time there.
"""
from django.db import connection
if settings.DEBUG and len(connection.queries) > 0:
total_time = 0
output = "\033[1;31m[Request Started: %s]\033[0m\n" % (path)
for query in connection.queries:
total_time += float(query.get('time'))
output = output + "\033[1;31m[%s]\033[0m \033[1m%s\033[0m\n" % (
query.get('time'), " ".join(query['sql'].split()))
output = output + "\033[1;31m[Request Finished: %s queries in %s seconds] \
\033[0m" % (len(connection.queries), total_time)
print output.encode('utf-8')
def __call__(self, request):
# get number of db queries
n = len(connection.queries)
# time the view
response = self.get_response(request)
if settings.DEBUG == False and not n:
return response
# compute the db time for the queries just run
db_queries = len(connection.queries) - n
if db_queries:
db_time = reduce(add, [float(q['time']) for q in connection.queries[n:]])
else:
db_time = 0.0
if 'text/html' in response.get('Content-Type', ''):
response.content = response.content.replace('<p id="response-time-db">', '<p class="bg-info">Database found took %s' % str(db_time))
return response
def test_traverse_GFK(self):
"""
Test that we can traverse a 'content_object' with prefetch_related() and
get to related objects on the other side (assuming it is suitably
filtered)
"""
TaggedItem.objects.create(tag="awesome", content_object=self.book1)
TaggedItem.objects.create(tag="awesome", content_object=self.book2)
TaggedItem.objects.create(tag="awesome", content_object=self.book3)
TaggedItem.objects.create(tag="awesome", content_object=self.reader1)
TaggedItem.objects.create(tag="awesome", content_object=self.reader2)
ct = ContentType.objects.get_for_model(Book)
# We get 3 queries - 1 for main query, 1 for content_objects since they
# all use the same table, and 1 for the 'read_by' relation.
with self.assertNumQueries(3):
# If we limit to books, we know that they will have 'read_by'
# attributes, so the following makes sense:
qs = TaggedItem.objects.filter(content_type=ct, tag='awesome').prefetch_related('content_object__read_by')
readers_of_awesome_books = {r.name for tag in qs
for r in tag.content_object.read_by.all()}
self.assertEqual(readers_of_awesome_books, {"me", "you", "someone"})
def setup_sending_before_clearing_queries_log_signal():
class SignalSendingBeforeClearingQueriesProxy(DelegatingProxy):
def clear(self):
before_clearing_queries_log.send(sender=None, queries=tuple(self))
self.wrapped.clear()
connection.queries_log = SignalSendingBeforeClearingQueriesProxy(
connection.queries_log)
def __init__(self, queries, name):
self.queries = queries
super(QueryCountResult, self).__init__(
name=name, value=self.number_of_queries)
def number_of_queries(self):
return len(self.queries)
def __enter__(self):
self.queries = []
self.nr_of_queries_when_entering = len(connection.queries)
self.orig_force_debug_cursor = connection.force_debug_cursor
connection.force_debug_cursor = True
before_clearing_queries_log.connect(
self.queries_about_to_be_reset_handler)
return self
def queries_about_to_be_reset_handler(self,
signal, sender, queries, **kwargs):
self.store_queries()
self.nr_of_queries_when_entering = 0
def store_queries(self):
self.queries += connection.queries[self.nr_of_queries_when_entering:]
def lsql():
print(connection.queries[-1])
def debug(request):
"""
Returns context variables helpful for debugging.
"""
context_extras = {}
if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS:
context_extras['debug'] = True
from django.db import connection
# Return a lazy reference that computes connection.queries on access,
# to ensure it contains queries triggered after this function runs.
context_extras['sql_queries'] = lazy(lambda: connection.queries, list)
return context_extras
def test_fetch(data):
settings.DEBUG = True
base_queries = len(connection.queries)
book = Book.objects.first()
assert len(connection.queries) - base_queries == 1
author = book.fetch_author()
assert len(connection.queries) - base_queries == 2
assert author.id == book.author_id
def test_queryset_methods2(data):
settings.DEBUG = True
base_queries = len(connection.queries)
book_qs = Book.objects.all()
assert len(connection.queries) - base_queries == 0
book_list = book_qs.to_list()
assert len(connection.queries) - base_queries == 1
book_qs.to_list()
assert len(connection.queries) - base_queries == 2
def process_response(self, request, response):
if settings.DEBUG:
for query in connection.queries:
print(
"\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (query['time'], " ".join(query['sql'].split())))
return response
def log_new_queries(queries, debug_logger=None):
from django.db import connection
debug_logger = debug_logger or logger
new_queries = list(connection.queries[queries:])
for query in new_queries:
debug_logger.debug(query["time"])
debug_logger.debug("\t%s", query["sql"])
def debug_sql(debug_logger=None):
from django.conf import settings
from django.db import connection
debug = settings.DEBUG
settings.DEBUG = True
queries = len(connection.queries)
try:
yield
finally:
log_new_queries(
queries,
debug_logger)
settings.DEBUG = debug
def post(self, request):
data = SignInSerializer().load_data(request.POST)
user = get_user_model().objects.filter(email=data['email']).first()
if user is None or not user.check_password(data['password']):
raise AuthenticationError()
token = AuthToken.objects.create(user_id=user.id)
print(connection.queries)
return ApiResponse({'token': token.value})
test_query_scaling.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _measure_scaling(self, create_n, measured_resource, scaled_resource = None):
"""
:param create_n: Function to create N of scaled_resource
:param measured_resource: The resource we will measure the query load for
:param scaled_resource: The object which is actually being scaled with N
:return: Instance of Order1, OrderN, OrderBad
"""
if scaled_resource is None:
scaled_resource = measured_resource
query_counts = {}
samples = [5, 6, 7, 8]
for n in samples:
ObjectCache.clear()
create_n(n)
# Queries get reset at the start of a request
self.assertEqual(scaled_resource._meta.queryset.count(), n)
response = self.api_client.get("/api/%s/" % measured_resource._meta.resource_name, data = {'limit': 0})
self.assertEqual(response.status_code, 200, "%s:%s" % (response.content, measured_resource._meta.resource_name))
query_count = len(connection.queries)
self.assertEqual(len(self.deserialize(response)['objects']), measured_resource._meta.queryset.count())
query_counts[n] = query_count
# Ignore samples[0], it was just to clear out any setup overhead from first call to API
# gradient between samples[1] and samples[2]
grad1 = (query_counts[samples[2]] - query_counts[samples[1]]) / (samples[2] - samples[1])
# gradient between samples[2] and samples[3]
grad2 = (query_counts[samples[3]] - query_counts[samples[2]]) / (samples[3] - samples[2])
if grad1 == 0 and grad2 == 0:
# Hoorah, O(1)
return Order1(query_counts[samples[3]])
elif grad1 > 0 and grad1 == grad2:
# O(N)
return OrderN(grad1)
else:
# Worse than O(N)
return OrderBad()
def test_cached_hosts(self):
instance = HostListMixin()
instance.host_ids = json.dumps([self.hosts[1].id])
self.assertListEqual(instance.hosts, [self.hosts[1]])
db_hits = len(connection.queries)
self.assertListEqual(instance.hosts, [self.hosts[1]])
self.assertEqual(db_hits, len(connection.queries))
def test_changing_hosts(self):
instance = HostListMixin()
instance.host_ids = json.dumps([self.hosts[1].id])
self.assertListEqual(instance.hosts, [self.hosts[1]])
db_hits = len(connection.queries)
instance.host_ids = json.dumps([self.hosts[0].id])
self.assertListEqual(instance.hosts, [self.hosts[0]])
self.assertNotEqual(db_hits, len(connection.queries))