def get(self, request, format=None, *args, **kwargs):
query = """
SELECT row_to_json(fc)
FROM (
SELECT 'FeatureCollection' AS type,
array_to_json(array_agg(f)) AS features
FROM (SELECT 'Feature' AS type,
ST_AsGeoJSON(g.geom_simple)::json AS geometry,
g.uuid AS id,
row_to_json((SELECT p FROM (
SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p))
AS properties
FROM pfb_analysis_neighborhood AS g WHERE g.visibility <> %s) AS f) AS fc;
"""
with connection.cursor() as cursor:
cursor.execute(query, [Neighborhood.Visibility.HIDDEN])
json = cursor.fetchone()
if not json or not len(json):
return Response({})
return Response(json[0])
python类cursor()的实例源码
def getWorkChartsByPercent():
cursor = connection.cursor()
sql = "select sum(notes->'$.insert') `insert`,sum(notes->'$.delete') `delete`,sum(notes->'$.update') `update`," \
"sum(notes->'$.create') `create`,sum(notes->'$.alter') `alter` from %s" % TABLE
cursor.execute(sql)
field_names = [item[0] for item in cursor.description]
rawData = cursor.fetchall()
#
result = []
for row in rawData:
objDict = {}
# ?????????????Dict?
for index, value in enumerate(row):
objDict[field_names[index]] = value
result.append(objDict)
return result
# ??????,????????????????????
def health(request):
# check database
try:
with connection.cursor() as cursor:
cursor.execute("select 1")
assert cursor.fetchone()
except:
log.exception("Database connectivity failed")
return HttpResponse(
"Database connectivity failed",
content_type="text/plain", status=500)
# check debug
if settings.DEBUG:
log.exception("Debug mode not allowed in production")
return HttpResponse(
"Debug mode not allowed in production",
content_type="text/plain", status=500)
return HttpResponse("Health OK", content_type='text/plain', status=200)
def validate_server_db1():
"""Server may have only single database of particular type"""
cursor = connection.cursor()
query = """
SELECT MAX(counted) FROM
(
SELECT COUNT(*) AS counted
FROM api_db
WHERE type = "S"
GROUP BY server_id,type_db
) AS counts;
"""
cursor.execute(query)
res = cursor.fetchone()[0]
if res > 1:
return [
'Server may have only single database of particular type!']
return []
def get(self, request, format=None, *args, **kwargs):
"""
Uses raw query for fetching as GeoJSON because it is much faster to let PostGIS generate
than Djangonauts serializer.
"""
query = """
SELECT row_to_json(fc)
FROM (
SELECT 'FeatureCollection' AS type,
array_to_json(array_agg(f)) AS features
FROM (SELECT 'Feature' AS type, ST_AsGeoJSON(g.geom_pt)::json AS geometry, g.uuid AS id,
row_to_json((SELECT p FROM (
SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p))
AS properties
FROM pfb_analysis_neighborhood AS g WHERE g.visibility <> %s) AS f) AS fc;
"""
with connection.cursor() as cursor:
cursor.execute(query, [Neighborhood.Visibility.HIDDEN])
json = cursor.fetchone()
if not json or not len(json):
return Response({})
return Response(json[0])
managers.py 文件源码
项目:django-calaccess-processed-data
作者: california-civic-data-coalition
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def load_raw_data(self):
"""
Load the model by executing its raw sql load query.
Temporarily drops any constraints or indexes on the model.
"""
try:
self.drop_constraints_and_indexes()
except ValueError as e:
print(e)
print('Constrained fields: %s' % self.constrained_fields)
print('Indexed fields: %s' % self.indexed_fields)
dropped = False
else:
dropped = True
c = connection.cursor()
try:
c.execute(self.raw_data_load_query)
finally:
c.close()
if dropped:
self.add_constraints_and_indexes()
def test_query_sanitises_date_data(self):
first_purchase = Purchase.objects.get(book__title="Baking things")
second_purchase = Purchase.objects.get(book__title="Treaty Negotiations")
clive = Person.objects.get(first_name="Clive")
assert first_purchase.purchased_at > second_purchase.purchased_at
assert clive.date_of_birth == datetime.date(1920, 1, 9)
cursor = connection.cursor()
cursor.execute(get_updates_for_model(Purchase))
cursor.execute(get_updates_for_model(Person))
first_purchase = Purchase.objects.get(book__title="Baking things")
second_purchase = Purchase.objects.get(book__title="Treaty Negotiations")
clive = Person.objects.get(pk=clive.pk)
assert first_purchase.purchased_at == second_purchase.purchased_at
assert clive.date_of_birth != datetime.date(1920, 1, 9)
def faq(request: HttpRequest):
num_scans = Site.objects.filter(scans__isnull=False).count()
num_scanning_sites = Scan.objects.filter(end__isnull=True).count()
query = '''SELECT
COUNT(jsonb_array_length("result"->'leaks'))
FROM backend_scanresult
WHERE backend_scanresult.scan_id IN (
SELECT backend_site.last_scan_id
FROM backend_site
WHERE backend_site.last_scan_id IS NOT NULL)
AND jsonb_array_length("result"->'leaks') > 0'''
with connection.cursor() as cursor:
cursor.execute(query)
num_sites_failing_serverleak = cursor.fetchone()[0]
return render(request, 'frontend/faq.html', {
'num_scanning_sites': num_scanning_sites,
'num_scans': num_scans,
'num_sites': Site.objects.count(),
'num_sites_failing_serverleak': num_sites_failing_serverleak
})
def db_status():
cursor = connection.cursor()
status = collections.OrderedDict()
query = ['Uptime','Queries','Threads_running','Slow_queries','Flush_commands','Open_tables']
for key in query:
sql = ("SHOW STATUS LIKE '%s'") % key
cursor.execute(sql)
for (Variable_name, Value) in cursor:
status[Variable_name] = int(Value)
try:
status['QPS'] = round(status['Queries']/status['Uptime'],2)
except:
status['QPS'] = 0
return status
def _pg(cursor):
cursor.execute("""
CREATE OR REPLACE FUNCTION update_alert_after()
RETURNS trigger AS $update_alert_after$
BEGIN
IF NEW.last_ping IS NOT NULL THEN
NEW.alert_after := NEW.last_ping + NEW.timeout + NEW.grace;
END IF;
RETURN NEW;
END;
$update_alert_after$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS update_alert_after ON api_check;
CREATE TRIGGER update_alert_after
BEFORE INSERT OR UPDATE OF last_ping, timeout, grace ON api_check
FOR EACH ROW EXECUTE PROCEDURE update_alert_after();
""")
def _sqlite(cursor):
cursor.execute("""
DROP TRIGGER IF EXISTS update_alert_after;
""")
cursor.execute("""
CREATE TRIGGER update_alert_after
AFTER UPDATE OF last_ping, timeout, grace ON api_check
FOR EACH ROW BEGIN
UPDATE api_check
SET alert_after =
datetime(strftime('%s', last_ping) +
timeout/1000000 + grace/1000000, 'unixepoch')
WHERE id = OLD.id;
END;
""")
def remove_delete_protection(*models):
"""
Temporarily removes delete protection on any number of models
Args:
*models: One or more models whose tables will have delete protection temporarily removed
"""
table_names = [model._meta.db_table for model in models]
with connection.cursor() as cursor:
for table_name in table_names:
cursor.execute("DROP RULE delete_protect ON {}".format(table_name))
try:
yield
finally:
for table_name in reversed(table_names):
cursor.execute("CREATE RULE delete_protect AS ON DELETE TO {} DO INSTEAD NOTHING".format(table_name))
def django_db_setup(django_db_setup, django_db_blocker, database_loader):
"""
Fixture provided by pytest-django to allow for custom Django database config.
'django_db_setup' exists in the arguments because we want to perform the normal pytest-django
database setup before applying our own changes.
"""
with django_db_blocker.unblock():
with connection.cursor() as cur:
load_from_existing_db = should_load_from_existing_db(database_loader, cur)
if not load_from_existing_db:
# Drop a wagtail table due to a bug: https://github.com/wagtail/wagtail/issues/1824
cur.execute('DROP TABLE IF EXISTS wagtailsearch_editorspick CASCADE;')
# Create the initial post-migration database backup to be restored before each test case
database_loader.create_backup(db_cursor=cur)
if load_from_existing_db:
with django_db_blocker.unblock():
terminate_db_connections()
database_loader.load_backup()
def terminate_db_connections():
"""Terminates active connections to the database being used by Django"""
kill_connection_sql = \
"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid();"
with connection.cursor() as cur:
cur.execute(kill_connection_sql)
connection.close()
def should_load_from_existing_db(database_loader, cursor):
"""
Helper method to determine whether or not a backup database should be loaded to begin
test execution. A backup db should be used if that backup exists, and if the pytest config
options don't indicate that the database should be freshly created to start the the test
suite execution.
Args:
database_loader (DatabaseLoader): A DatabaseLoader instance
cursor (django.db.connection.cursor): A database cursor
Returns:
bool: Whether or not a backup database should be loaded to begin test execution
"""
# We should load a db backup to start the test suite if that backup exists,
# and if the config options don't indicate that the database should be freshly
# created to start the the test suite execution
return (
pytest.config.option.reuse_db and
not pytest.config.option.create_db and
database_loader.has_backup(db_cursor=cursor)
)
def updateStatistics():
cursor = connection.cursor()
sql = "select id, sql_content, notes from %s" % TABLE
cursor.execute(sql)
result = cursor.fetchall()
#
for id, sql_content, notes in result:
dxl = {'insert':0, 'delete':0, 'update':0, 'create':0, 'alter':0}
sqlList = sql_content.lower().split(';')
for sql in sqlList:
for key in dxl.keys():
if sql.strip().startswith(key):
dxl[key] += 1
#
notes = json.loads(notes)
notes.update(dxl)
update_sql = """update %s set notes='%s' where id=%d""" % (TABLE, json.dumps(notes), id)
cursor.execute(update_sql)
def getDbsFromClusterName(request):
if request.is_ajax():
cluster_name = request.POST.get("cluster_name")
else:
cluster_name = request.POST['cluster_name']
# cursor = connection.cursor()
# cursor.execute("select distinct table_schema from dbconfig_mysql_cluster_metadata where cluster_name=%s", (cluster_name,))
# dbs = [db[0] for db in cursor.fetchall()]
# cursor.close()
dictConn = getSlaveConnStr(cluster_name)
if not dictConn:
dictConn = getMasterConnStr(cluster_name)
Host = dictConn['Host']
Port = dictConn['Port']
User = dictConn['User']
Password = dictConn['Password']
dbs = getMySQLClusterDbs(Host, Port, User, Password)
result = {'status':0, 'msg':'ok', 'data':dbs}
return HttpResponse(json.dumps(result), content_type='application/json')
def post_migrate_mpathnode(model):
# Note: model *isn't* a subclass of MPathNode, because django migrations are Weird.
# if not issubclass(model, MPathNode):
# Hence the following workaround:
try:
ltree_field = model._meta.get_field('ltree')
if not isinstance(ltree_field, LTreeField):
return
except FieldDoesNotExist:
return
names = {
"table": quote_ident(model._meta.db_table, connection.connection),
"check_constraint": quote_ident('%s__check_ltree' % model._meta.db_table, connection.connection),
}
cur = connection.cursor()
# Check that the ltree is always consistent with being a child of _parent
cur.execute('''
ALTER TABLE %(table)s ADD CONSTRAINT %(check_constraint)s CHECK (
(parent_id IS NOT NULL AND ltree ~ (parent_id::text || '.*{1}')::lquery)
OR (parent_id IS NULL AND ltree ~ '*{1}'::lquery)
)
''' % names)
def domainundelete(request, fqdn):
if request.method != "POST":
raise SuspiciousOperation
if not request.user.is_authenticated() or not request.user.is_active:
raise PermissionDenied
if fqdn != fqdn.lower():
raise PermissionDenied
if not check_handle_domain_auth(connection.cursor(),
request.user.username, fqdn):
return HttpResponseForbidden(_("Unauthorized"))
dbh = psycopg2.connect(autoreg.conf.dbstring)
dd = autoreg.dns.db.db(dbh)
dd.login('autoreg')
dd.undelete(fqdn, None)
return HttpResponseRedirect(reverse(domainedit, args=[fqdn]))
def _gen_checksoa_log(domain, handle, nsiplist=None, doit=False,
newdomain=False, form=None, dnsdb=None,
level=autoreg.dns.check.LEVEL_NS):
"""Same as _gen_checksoa(), and keep a log of the output."""
soac = autoreg.dns.check.SOAChecker(domain, {}, {})
soac.set_level(level)
rec = []
dbc = connection.cursor()
contact = Contacts.objects.get(handle=handle.upper())
for line in _gen_checksoa(domain, nsiplist, doit, dnsdb, soac, contact,
newdomain, form):
rec.append(line)
yield line
dbc.execute("INSERT INTO requests_log"
" (fqdn, contact_id, output, errors, warnings)"
" VALUES (%s, %s, %s, %s, %s)",
(domain, contact.id, ''.join(rec), soac.errs, soac.warns))
assert dbc.rowcount == 1
def rqdom(request, domain):
if request.method != "GET":
raise SuspiciousOperation
if not request.user.is_authenticated() or not request.user.is_active:
return HttpResponseRedirect(URILOGIN + '?next=%s' % request.path)
login = admin_login(connection.cursor(), request.user.username)
if not login:
raise PermissionDenied
if domain.upper() != domain:
return HttpResponseRedirect(reverse(rqlistdom, args=[domain.upper()]))
rlist = _rq_list_dom(domain)
i = 1
for r in rlist:
_rq1(request, r)
r.suffix = i
i += 1
vars = { 'rlist': rlist,
'goto': request.GET.get('page', '') }
return render(request, 'requests/rqdisplay.html', vars)
def rqdisplaychecked(request):
if request.method != "GET":
raise SuspiciousOperation
if not request.user.is_authenticated() or not request.user.is_active:
return HttpResponseRedirect(URILOGIN + '?next=%s' % request.path)
login = admin_login(connection.cursor(), request.user.username)
if not login:
raise PermissionDenied
rlist = _rq_list_dom(domain)
i = 1
for r in rlist:
_rq1(request, r)
r.suffix = i
i += 1
vars = { 'rlist': rlist,
'goto': request.GET.get('page', '') }
return render(request, 'requests/rqdisplay.html', vars)
def rqlistdom(request, domain=None):
if request.method != "GET":
raise SuspiciousOperation
if not request.user.is_authenticated() or not request.user.is_active:
return HttpResponseRedirect(URILOGIN + '?next=%s' % request.path)
login = admin_login(connection.cursor(), request.user.username)
if not login:
raise PermissionDenied
if domain is None:
# domain not in URL, provided by "?domain=..." argument (search form)
domain = request.GET.get('domain', '').upper()
elif domain.upper() != domain:
return HttpResponseRedirect(reverse(rqlistdom, args=[domain.upper()]))
z = autoreg.zauth.ZAuth(connection.cursor())
rlist = _rq_list_dom(domain)
for r in rlist:
if not z.checkparent(r.fqdn, login):
continue
_rq_decorate(r)
vars = { 'rlist': rlist, 'fqdn': domain,
'goto': request.GET.get('page', '') }
return render(request, 'requests/rqlistdom.html', vars)
def rq_run(out):
import autoreg.dns.db
dd = autoreg.dns.db.db(dbc=connection.cursor())
dd.login('autoreg')
whoisdb = autoreg.whois.db.Main(dbc=connection.cursor())
rl = Requests.objects.exclude(pending_state=None).order_by('id')
for r in rl:
with transaction.atomic():
r2 = Requests.objects.select_for_update().get(id=r.id)
try:
r2.do_pending_exc(out, dd, whoisdb)
ok = True
except IntegrityError as e:
print(six.text_type(e), file=out)
ok = False
if ok:
print(_("Status: committed"), file=out)
else:
print(_("Status: cancelled"), file=out)
def handle_noargs(self, **opts):
from django.db import connection
c = connection.cursor()
for article in Article.objects.all():
c.execute("""SELECT c.slug
FROM articles_article_categories aac
JOIN articles_category c
ON aac.category_id = c.id
WHERE aac.article_id=%s""", (article.id,))
names = [row[0] for row in c.fetchall()]
tags = [Tag.objects.get_or_create(name=t)[0] for t in names]
article.tags = tags
article.save()
def get_max_years():
"""
Determines the maximum number of available years to go back.
:return: the number of years
:rtype: int
"""
cursor = connection.cursor()
cursor.execute("""
select extract(year from min(start_date))
from %s
where start_date > '1900-01-01'
""" % StudentDates._meta.db_table)
min_year = None
max_years = None
for row in cursor.fetchall():
min_year = row[0]
if min_year is not None:
max_years = date.today().year - min_year
return int(max_years)
def get_schools():
"""
Retrieves a sorted list of all the deparments.
:return: the list of departments
:rtype: list
"""
cursor = connection.cursor()
cursor.execute("""
SELECT DISTINCT(school)
FROM %s
WHERE char_length(school) > 0
ORDER BY school ASC
""" % StudentDates._meta.db_table)
result = []
for row in cursor.fetchall():
result.append(row[0])
return result
def get_departments(schools):
"""
Retrieves a sorted list of all the departments.
:param schools: the list of schools to list the departments for.
:type schools: list
:return: the list of departments
:rtype: list
"""
sql = """
select distinct(owning_department_clevel)
from %s
where owning_school_clevel in ('%s')
order by owning_department_clevel
""" % (GradeResults._meta.db_table, "','".join(schools))
cursor = connection.cursor()
cursor.execute(sql)
result = []
for row in cursor.fetchall():
result.append(row[0])
return result
def get_papers(paper):
"""
Retrieves a sorted list of all the papers that match the search (eg COMP900 or PSYC9%).
:param paper: the paper search string
:type paper: str
:return: the list of papers
:rtype: list
"""
sql = """
select distinct(paper_master_code)
from %s
where paper_master_code like '%s'
order by paper_master_code
""" % (GradeResults._meta.db_table, paper)
cursor = connection.cursor()
cursor.execute(sql)
result = []
for row in cursor.fetchall():
result.append(row[0])
return result
def get_scholarships():
"""
Retrieves a sorted list of all scholarships available.
:return: the list of scholarships
:rtype: list
"""
cursor = connection.cursor()
cursor.execute("""
SELECT DISTINCT(name)
FROM %s
WHERE char_length(name) > 0
ORDER BY name ASC
""" % Scholarship._meta.db_table)
result = []
for row in cursor.fetchall():
result.append(row[0])
return result