python类cursor()的实例源码

views.py 文件源码 项目:pfb-network-connectivity 作者: azavea 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, request, format=None, *args, **kwargs):
        query = """
        SELECT row_to_json(fc)
        FROM (
            SELECT 'FeatureCollection' AS type,
                array_to_json(array_agg(f)) AS features
            FROM (SELECT 'Feature' AS type,
                  ST_AsGeoJSON(g.geom_simple)::json AS geometry,
                  g.uuid AS id,
                  row_to_json((SELECT p FROM (
                    SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p))
                    AS properties
            FROM pfb_analysis_neighborhood AS g WHERE g.visibility <> %s) AS f) AS fc;
        """

        with connection.cursor() as cursor:
            cursor.execute(query, [Neighborhood.Visibility.HIDDEN])
            json = cursor.fetchone()
            if not json or not len(json):
                return Response({})

        return Response(json[0])
sqlreviewDal.py 文件源码 项目:wizard 作者: honor100 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getWorkChartsByPercent():
    cursor = connection.cursor()
    sql = "select sum(notes->'$.insert') `insert`,sum(notes->'$.delete') `delete`,sum(notes->'$.update') `update`," \
          "sum(notes->'$.create') `create`,sum(notes->'$.alter') `alter` from %s" % TABLE
    cursor.execute(sql)
    field_names = [item[0] for item in cursor.description]
    rawData = cursor.fetchall()
    #
    result = []
    for row in rawData:
        objDict = {}
        # ?????????????Dict?
        for index, value in enumerate(row):
            objDict[field_names[index]] = value
        result.append(objDict)
    return result

# ??????,????????????????????
views.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def health(request):
    # check database
    try:
        with connection.cursor() as cursor:
            cursor.execute("select 1")
            assert cursor.fetchone()
    except:
        log.exception("Database connectivity failed")
        return HttpResponse(
            "Database connectivity failed",
            content_type="text/plain", status=500)

    # check debug
    if settings.DEBUG:
        log.exception("Debug mode not allowed in production")
        return HttpResponse(
            "Debug mode not allowed in production",
            content_type="text/plain", status=500)

    return HttpResponse("Health OK", content_type='text/plain', status=200)
admin.py 文件源码 项目:DCPanel 作者: vladgr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate_server_db1():
    """Server may have only single database of particular type"""
    cursor = connection.cursor()

    query = """
        SELECT MAX(counted) FROM
        (
            SELECT COUNT(*) AS counted
            FROM api_db
            WHERE type = "S"
            GROUP BY server_id,type_db
        ) AS counts;
        """

    cursor.execute(query)
    res = cursor.fetchone()[0]
    if res > 1:
        return [
            'Server may have only single database of particular type!']
    return []
views.py 文件源码 项目:pfb-network-connectivity 作者: azavea 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get(self, request, format=None, *args, **kwargs):
        """
        Uses raw query for fetching as GeoJSON because it is much faster to let PostGIS generate
        than Djangonauts serializer.
        """
        query = """
        SELECT row_to_json(fc)
        FROM (
            SELECT 'FeatureCollection' AS type,
                array_to_json(array_agg(f)) AS features
            FROM (SELECT 'Feature' AS type, ST_AsGeoJSON(g.geom_pt)::json AS geometry, g.uuid AS id,
                  row_to_json((SELECT p FROM (
                    SELECT uuid AS id, name, label, state_abbrev, organization_id) AS p))
                    AS properties
            FROM pfb_analysis_neighborhood AS g WHERE g.visibility <> %s) AS f)  AS fc;
        """

        with connection.cursor() as cursor:
            cursor.execute(query, [Neighborhood.Visibility.HIDDEN])
            json = cursor.fetchone()
            if not json or not len(json):
                return Response({})

        return Response(json[0])
managers.py 文件源码 项目:django-calaccess-processed-data 作者: california-civic-data-coalition 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def load_raw_data(self):
        """
        Load the model by executing its raw sql load query.

        Temporarily drops any constraints or indexes on the model.
        """
        try:
            self.drop_constraints_and_indexes()
        except ValueError as e:
            print(e)
            print('Constrained fields: %s' % self.constrained_fields)
            print('Indexed fields: %s' % self.indexed_fields)
            dropped = False
        else:
            dropped = True

        c = connection.cursor()
        try:
            c.execute(self.raw_data_load_query)
        finally:
            c.close()
            if dropped:
                self.add_constraints_and_indexes()
test_export.py 文件源码 项目:django-scrub-pii 作者: MatthewWilkes 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_query_sanitises_date_data(self):
        first_purchase = Purchase.objects.get(book__title="Baking things")
        second_purchase = Purchase.objects.get(book__title="Treaty Negotiations")
        clive = Person.objects.get(first_name="Clive")

        assert first_purchase.purchased_at > second_purchase.purchased_at
        assert clive.date_of_birth == datetime.date(1920, 1, 9)

        cursor = connection.cursor()
        cursor.execute(get_updates_for_model(Purchase))
        cursor.execute(get_updates_for_model(Person))

        first_purchase = Purchase.objects.get(book__title="Baking things")
        second_purchase = Purchase.objects.get(book__title="Treaty Negotiations")
        clive = Person.objects.get(pk=clive.pk)

        assert first_purchase.purchased_at == second_purchase.purchased_at
        assert clive.date_of_birth != datetime.date(1920, 1, 9)
views.py 文件源码 项目:PrivacyScore 作者: PrivacyScore 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def faq(request: HttpRequest):
    num_scans  = Site.objects.filter(scans__isnull=False).count()
    num_scanning_sites = Scan.objects.filter(end__isnull=True).count()

    query = '''SELECT
        COUNT(jsonb_array_length("result"->'leaks'))
        FROM backend_scanresult
        WHERE backend_scanresult.scan_id IN (
            SELECT backend_site.last_scan_id
            FROM backend_site
            WHERE backend_site.last_scan_id IS NOT NULL)
        AND jsonb_array_length("result"->'leaks') > 0'''

    with connection.cursor() as cursor:
        cursor.execute(query)
        num_sites_failing_serverleak = cursor.fetchone()[0]

    return render(request, 'frontend/faq.html', {
        'num_scanning_sites': num_scanning_sites,
        'num_scans':  num_scans,
        'num_sites': Site.objects.count(),
        'num_sites_failing_serverleak': num_sites_failing_serverleak
    })
statistics.py 文件源码 项目:DCRM 作者: 82Flex 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def db_status():
    cursor = connection.cursor()
    status = collections.OrderedDict()
    query = ['Uptime','Queries','Threads_running','Slow_queries','Flush_commands','Open_tables']

    for key in query:
        sql = ("SHOW STATUS LIKE '%s'") % key
        cursor.execute(sql)
        for (Variable_name, Value) in cursor:
            status[Variable_name] = int(Value)
    try:
        status['QPS'] = round(status['Queries']/status['Uptime'],2)
    except:
        status['QPS'] = 0

    return status
ensuretriggers.py 文件源码 项目:healthchecks_asgards 作者: andela 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _pg(cursor):
    cursor.execute("""
    CREATE OR REPLACE FUNCTION update_alert_after()
    RETURNS trigger AS $update_alert_after$
        BEGIN
            IF NEW.last_ping IS NOT NULL THEN
                NEW.alert_after := NEW.last_ping + NEW.timeout + NEW.grace;
            END IF;
            RETURN NEW;
        END;
    $update_alert_after$ LANGUAGE plpgsql;

    DROP TRIGGER IF EXISTS update_alert_after ON api_check;

    CREATE TRIGGER update_alert_after
    BEFORE INSERT OR UPDATE OF last_ping, timeout, grace  ON api_check
    FOR EACH ROW EXECUTE PROCEDURE update_alert_after();
    """)
ensuretriggers.py 文件源码 项目:healthchecks_asgards 作者: andela 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _sqlite(cursor):
    cursor.execute("""
    DROP TRIGGER IF EXISTS update_alert_after;
    """)

    cursor.execute("""
    CREATE TRIGGER update_alert_after
    AFTER UPDATE OF last_ping, timeout, grace ON api_check
    FOR EACH ROW BEGIN
        UPDATE api_check
        SET alert_after =
            datetime(strftime('%s', last_ping) +
            timeout/1000000 + grace/1000000, 'unixepoch')
        WHERE id = OLD.id;
    END;
    """)
unseed_db.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def remove_delete_protection(*models):
    """
    Temporarily removes delete protection on any number of models

    Args:
        *models: One or more models whose tables will have delete protection temporarily removed
    """
    table_names = [model._meta.db_table for model in models]
    with connection.cursor() as cursor:
        for table_name in table_names:
            cursor.execute("DROP RULE delete_protect ON {}".format(table_name))
        try:
            yield
        finally:
            for table_name in reversed(table_names):
                cursor.execute("CREATE RULE delete_protect AS ON DELETE TO {} DO INSTEAD NOTHING".format(table_name))
conftest.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def django_db_setup(django_db_setup, django_db_blocker, database_loader):
    """
    Fixture provided by pytest-django to allow for custom Django database config.
    'django_db_setup' exists in the arguments because we want to perform the normal pytest-django
    database setup before applying our own changes.
    """
    with django_db_blocker.unblock():
        with connection.cursor() as cur:
            load_from_existing_db = should_load_from_existing_db(database_loader, cur)
            if not load_from_existing_db:
                # Drop a wagtail table due to a bug: https://github.com/wagtail/wagtail/issues/1824
                cur.execute('DROP TABLE IF EXISTS wagtailsearch_editorspick CASCADE;')
                # Create the initial post-migration database backup to be restored before each test case
                database_loader.create_backup(db_cursor=cur)
    if load_from_existing_db:
        with django_db_blocker.unblock():
            terminate_db_connections()
        database_loader.load_backup()
util.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def terminate_db_connections():
    """Terminates active connections to the database being used by Django"""
    kill_connection_sql = \
        "SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid();"
    with connection.cursor() as cur:
        cur.execute(kill_connection_sql)
    connection.close()
util.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def should_load_from_existing_db(database_loader, cursor):
    """
    Helper method to determine whether or not a backup database should be loaded to begin
    test execution. A backup db should be used if that backup exists, and if the pytest config
    options don't indicate that the database should be freshly created to start the the test
    suite execution.

    Args:
        database_loader (DatabaseLoader): A DatabaseLoader instance
        cursor (django.db.connection.cursor): A database cursor

    Returns:
        bool: Whether or not a backup database should be loaded to begin test execution
    """
    # We should load a db backup to start the test suite if that backup exists,
    # and if the config options don't indicate that the database should be freshly
    # created to start the the test suite execution
    return (
        pytest.config.option.reuse_db and
        not pytest.config.option.create_db and
        database_loader.has_backup(db_cursor=cursor)
    )
sqlreviewDal.py 文件源码 项目:wizard 作者: honor100 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def updateStatistics():
    cursor = connection.cursor()
    sql = "select id, sql_content, notes from %s" % TABLE
    cursor.execute(sql)
    result = cursor.fetchall()
    #
    for id, sql_content, notes in result:
        dxl = {'insert':0, 'delete':0, 'update':0, 'create':0, 'alter':0}
        sqlList = sql_content.lower().split(';')
        for sql in sqlList:
            for key in dxl.keys():
                if sql.strip().startswith(key):
                    dxl[key] += 1
        #
        notes = json.loads(notes)
        notes.update(dxl)
        update_sql = """update %s set notes='%s' where id=%d""" % (TABLE, json.dumps(notes), id)
        cursor.execute(update_sql)
views_ajax.py 文件源码 项目:wizard 作者: honor100 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getDbsFromClusterName(request):
    if request.is_ajax():
        cluster_name = request.POST.get("cluster_name")
    else:
        cluster_name = request.POST['cluster_name']

    # cursor = connection.cursor()
    # cursor.execute("select distinct table_schema from dbconfig_mysql_cluster_metadata where cluster_name=%s", (cluster_name,))
    # dbs = [db[0] for db in cursor.fetchall()]
    # cursor.close()

    dictConn = getSlaveConnStr(cluster_name)
    if not dictConn:
        dictConn = getMasterConnStr(cluster_name)
    Host = dictConn['Host']
    Port = dictConn['Port']
    User = dictConn['User']
    Password = dictConn['Password']
    dbs = getMySQLClusterDbs(Host, Port, User, Password)

    result = {'status':0, 'msg':'ok', 'data':dbs}
    return HttpResponse(json.dumps(result), content_type='application/json')
operations.py 文件源码 项目:django-mpathy 作者: craigds 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def post_migrate_mpathnode(model):
    # Note: model *isn't* a subclass of MPathNode, because django migrations are Weird.
    # if not issubclass(model, MPathNode):
    # Hence the following workaround:
    try:
        ltree_field = model._meta.get_field('ltree')
        if not isinstance(ltree_field, LTreeField):
            return
    except FieldDoesNotExist:
        return

    names = {
        "table": quote_ident(model._meta.db_table, connection.connection),
        "check_constraint": quote_ident('%s__check_ltree' % model._meta.db_table, connection.connection),
    }

    cur = connection.cursor()
    # Check that the ltree is always consistent with being a child of _parent
    cur.execute('''
        ALTER TABLE %(table)s ADD CONSTRAINT %(check_constraint)s CHECK (
            (parent_id IS NOT NULL AND ltree ~ (parent_id::text || '.*{1}')::lquery)
            OR (parent_id IS NULL AND ltree ~ '*{1}'::lquery)
        )
    ''' % names)
views.py 文件源码 项目:autoreg 作者: pbeyssac 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def domainundelete(request, fqdn):
  if request.method != "POST":
    raise SuspiciousOperation
  if not request.user.is_authenticated() or not request.user.is_active:
    raise PermissionDenied
  if fqdn != fqdn.lower():
    raise PermissionDenied
  if not check_handle_domain_auth(connection.cursor(),
                                  request.user.username, fqdn):
    return HttpResponseForbidden(_("Unauthorized"))

  dbh = psycopg2.connect(autoreg.conf.dbstring)
  dd = autoreg.dns.db.db(dbh)
  dd.login('autoreg')

  dd.undelete(fqdn, None)

  return HttpResponseRedirect(reverse(domainedit, args=[fqdn]))
views.py 文件源码 项目:autoreg 作者: pbeyssac 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _gen_checksoa_log(domain, handle, nsiplist=None, doit=False,
                      newdomain=False, form=None, dnsdb=None,
                      level=autoreg.dns.check.LEVEL_NS):
  """Same as _gen_checksoa(), and keep a log of the output."""
  soac = autoreg.dns.check.SOAChecker(domain, {}, {})
  soac.set_level(level)
  rec = []
  dbc = connection.cursor()
  contact = Contacts.objects.get(handle=handle.upper())
  for line in _gen_checksoa(domain, nsiplist, doit, dnsdb, soac, contact,
                            newdomain, form):
    rec.append(line)
    yield line
  dbc.execute("INSERT INTO requests_log"
              " (fqdn, contact_id, output, errors, warnings)"
              " VALUES (%s, %s, %s, %s, %s)",
              (domain, contact.id, ''.join(rec), soac.errs, soac.warns))
  assert dbc.rowcount == 1
views.py 文件源码 项目:autoreg 作者: pbeyssac 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def rqdom(request, domain):
  if request.method != "GET":
    raise SuspiciousOperation
  if not request.user.is_authenticated() or not request.user.is_active:
    return HttpResponseRedirect(URILOGIN + '?next=%s' % request.path)
  login = admin_login(connection.cursor(), request.user.username)
  if not login:
    raise PermissionDenied
  if domain.upper() != domain:
    return HttpResponseRedirect(reverse(rqlistdom, args=[domain.upper()]))

  rlist = _rq_list_dom(domain)
  i = 1
  for r in rlist:
    _rq1(request, r)
    r.suffix = i
    i += 1
  vars = { 'rlist': rlist,
           'goto': request.GET.get('page', '') }
  return render(request, 'requests/rqdisplay.html', vars)
views.py 文件源码 项目:autoreg 作者: pbeyssac 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def rqdisplaychecked(request):
  if request.method != "GET":
    raise SuspiciousOperation
  if not request.user.is_authenticated() or not request.user.is_active:
    return HttpResponseRedirect(URILOGIN + '?next=%s' % request.path)
  login = admin_login(connection.cursor(), request.user.username)
  if not login:
    raise PermissionDenied

  rlist = _rq_list_dom(domain)
  i = 1
  for r in rlist:
    _rq1(request, r)
    r.suffix = i
    i += 1
  vars = { 'rlist': rlist,
           'goto': request.GET.get('page', '') }
  return render(request, 'requests/rqdisplay.html', vars)
views.py 文件源码 项目:autoreg 作者: pbeyssac 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def rqlistdom(request, domain=None):
  if request.method != "GET":
    raise SuspiciousOperation
  if not request.user.is_authenticated() or not request.user.is_active:
    return HttpResponseRedirect(URILOGIN + '?next=%s' % request.path)
  login = admin_login(connection.cursor(), request.user.username)
  if not login:
    raise PermissionDenied
  if domain is None:
    # domain not in URL, provided by "?domain=..." argument (search form)
    domain = request.GET.get('domain', '').upper()
  elif domain.upper() != domain:
    return HttpResponseRedirect(reverse(rqlistdom, args=[domain.upper()]))

  z = autoreg.zauth.ZAuth(connection.cursor())

  rlist = _rq_list_dom(domain)
  for r in rlist:
    if not z.checkparent(r.fqdn, login):
      continue
    _rq_decorate(r)

  vars = { 'rlist': rlist, 'fqdn': domain,
           'goto': request.GET.get('page', '') }
  return render(request, 'requests/rqlistdom.html', vars)
models.py 文件源码 项目:autoreg 作者: pbeyssac 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def rq_run(out):
  import autoreg.dns.db
  dd = autoreg.dns.db.db(dbc=connection.cursor())
  dd.login('autoreg')
  whoisdb = autoreg.whois.db.Main(dbc=connection.cursor())

  rl = Requests.objects.exclude(pending_state=None).order_by('id')

  for r in rl:
    with transaction.atomic():
      r2 = Requests.objects.select_for_update().get(id=r.id)
      try:
        r2.do_pending_exc(out, dd, whoisdb)
        ok = True
      except IntegrityError as e:
        print(six.text_type(e), file=out)
        ok = False
      if ok:
        print(_("Status: committed"), file=out)
      else:
        print(_("Status: cancelled"), file=out)
convert_categories_to_tags.py 文件源码 项目:prestashop-sync 作者: dragoon 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_noargs(self, **opts):
        from django.db import connection

        c = connection.cursor()

        for article in Article.objects.all():
            c.execute("""SELECT c.slug
FROM articles_article_categories aac
JOIN articles_category c
ON aac.category_id = c.id
WHERE aac.article_id=%s""", (article.id,))

            names = [row[0] for row in c.fetchall()]
            tags = [Tag.objects.get_or_create(name=t)[0] for t in names]
            article.tags = tags
            article.save()
views.py 文件源码 项目:automated-reporting 作者: Waikato 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_max_years():
    """
    Determines the maximum number of available years to go back.

    :return: the number of years
    :rtype: int
    """

    cursor = connection.cursor()
    cursor.execute("""
        select extract(year from min(start_date))
        from %s
        where start_date > '1900-01-01'
        """ % StudentDates._meta.db_table)
    min_year = None
    max_years = None
    for row in cursor.fetchall():
        min_year = row[0]
    if min_year is not None:
        max_years = date.today().year - min_year

    return int(max_years)
views.py 文件源码 项目:automated-reporting 作者: Waikato 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_schools():
    """
    Retrieves a sorted list of all the deparments.

    :return: the list of departments
    :rtype: list
    """

    cursor = connection.cursor()
    cursor.execute("""
        SELECT DISTINCT(school)
        FROM %s
        WHERE char_length(school) > 0
        ORDER BY school ASC
        """ % StudentDates._meta.db_table)
    result = []
    for row in cursor.fetchall():
        result.append(row[0])
    return result
views.py 文件源码 项目:automated-reporting 作者: Waikato 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_departments(schools):
    """
    Retrieves a sorted list of all the departments.

    :param schools: the list of schools to list the departments for.
    :type schools: list
    :return: the list of departments
    :rtype: list
    """

    sql = """
        select distinct(owning_department_clevel)
        from %s
        where owning_school_clevel in ('%s')
        order by owning_department_clevel
        """ % (GradeResults._meta.db_table, "','".join(schools))
    cursor = connection.cursor()
    cursor.execute(sql)
    result = []
    for row in cursor.fetchall():
        result.append(row[0])
    return result
views.py 文件源码 项目:automated-reporting 作者: Waikato 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_papers(paper):
    """
    Retrieves a sorted list of all the papers that match the search (eg COMP900 or PSYC9%).

    :param paper: the paper search string
    :type paper: str
    :return: the list of papers
    :rtype: list
    """

    sql = """
        select distinct(paper_master_code)
        from %s
        where paper_master_code like '%s'
        order by paper_master_code
        """ % (GradeResults._meta.db_table, paper)
    cursor = connection.cursor()
    cursor.execute(sql)
    result = []
    for row in cursor.fetchall():
        result.append(row[0])
    return result
views.py 文件源码 项目:automated-reporting 作者: Waikato 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_scholarships():
    """
    Retrieves a sorted list of all scholarships available.

    :return: the list of scholarships
    :rtype: list
    """
    cursor = connection.cursor()
    cursor.execute("""
        SELECT DISTINCT(name)
        FROM %s
        WHERE char_length(name) > 0
        ORDER BY name ASC
        """ % Scholarship._meta.db_table)
    result = []
    for row in cursor.fetchall():
        result.append(row[0])
    return result


问题


面经


文章

微信
公众号

扫码关注公众号