python类ProgrammingError()的实例源码

psql.py 文件源码 项目:ravel 作者: ravel-net 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def default(self, line):
        "Execute a PostgreSQL statement"
        try:
            self.db.cursor.execute(line)
        except psycopg2.ProgrammingError, e:
            print e
            return

        try:
            data = self.db.cursor.fetchall()
            if data is not None:
                names = [row[0] for row in self.db.cursor.description]
                print tabulate.tabulate(data, headers=names)
        except psycopg2.ProgrammingError:
            # no results, eg from an insert/delete
            pass
        except TypeError, e:
            print e
engines.py 文件源码 项目:sqlibrist 作者: condograde 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def apply_migration(self, name, statements, fake=False):
        import psycopg2
        connection = self.get_connection()
        with connection.cursor() as cursor:
            try:
                if not fake and statements.strip():
                    cursor.execute(statements)
            except (
                    psycopg2.OperationalError,
                    psycopg2.ProgrammingError) as e:
                connection.rollback()
                print(e.message)
                from sqlibrist.helpers import ApplyMigrationFailed

                raise ApplyMigrationFailed
            else:
                cursor.execute('INSERT INTO sqlibrist.migrations '
                               '(migration) VALUES (%s);',
                               [name.split('/')[-1]])
                connection.commit()
test_slot.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_delete_slot(slot):
    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=psycopg2.errorcodes.UNDEFINED_OBJECT):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe)
        slot.delete_slot()
    slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=-1):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
        with pytest.raises(psycopg2.ProgrammingError) as e_info:
            slot.delete_slot()
            slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

            assert e_info.value.pgcode == -1

    slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
    with pytest.raises(Exception):
        slot.delete_slot()
        slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
engines.py 文件源码 项目:sqlibrist 作者: condograde 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unapply_migration(self, name, statements, fake=False):
        import psycopg2
        connection = self.get_connection()
        with connection.cursor() as cursor:
            try:
                if not fake:
                    cursor.execute(statements)
            except (
                    psycopg2.OperationalError,
                    psycopg2.ProgrammingError) as e:
                connection.rollback()
                print(e.message)
                from sqlibrist.helpers import ApplyMigrationFailed

                raise ApplyMigrationFailed
            else:
                cursor.execute('DELETE FROM sqlibrist.migrations '
                               'WHERE migration = (%s); ', [name])
                connection.commit()
engines.py 文件源码 项目:sqlibrist 作者: condograde 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def apply_migration(self, name, statements, fake=False):
        import MySQLdb
        connection = self.get_connection()
        cursor = connection.cursor()

        try:
            if not fake and statements.strip():
                cursor.execute(statements)
        except (MySQLdb.OperationalError, MySQLdb.ProgrammingError) as e:
            print('\n'.join(map(str, e.args)))
            from sqlibrist.helpers import ApplyMigrationFailed

            raise ApplyMigrationFailed
        else:
            cursor.execute('INSERT INTO sqlibrist_migrations '
                           '(migration) VALUES (%s);',
                           [name.split('/')[-1]])
engines.py 文件源码 项目:sqlibrist 作者: condograde 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def unapply_migration(self, name, statements, fake=False):
        import MySQLdb
        connection = self.get_connection()
        cursor = connection.cursor()

        try:
            if not fake:
                cursor.execute(statements)
        except (MySQLdb.OperationalError, MySQLdb.ProgrammingError) as e:
            print('\n'.join(map(str, e.args)))
            from sqlibrist.helpers import ApplyMigrationFailed

            raise ApplyMigrationFailed
        else:
            cursor.execute('DELETE FROM sqlibrist_migrations '
                           'WHERE migration = (%s); ', [name])
manager.py 文件源码 项目:ldap2pg 作者: dalibo 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def pg_fetch(self, psql, sql, processor=None):
        # Implement common management of customizable queries

        # Disabled inspection
        if sql is None:
            return []

        try:
            if isinstance(sql, list):
                # Static inspection
                rows = sql[:]
            else:
                rows = psql(sql)

            if processor:
                rows = processor(rows)
            if not isinstance(rows, list):
                rows = list(rows)
            return rows
        except psycopg2.ProgrammingError as e:
            # Consider the query as user defined
            raise UserError(str(e))
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def skip_if_tpc_disabled(f):
    """Skip a test if the server has tpc support disabled."""
    @wraps(f)
    def skip_if_tpc_disabled_(self):
        from psycopg2 import ProgrammingError
        cnn = self.connect()
        cur = cnn.cursor()
        try:
            cur.execute("SHOW max_prepared_transactions;")
        except ProgrammingError:
            return self.skipTest(
                "server too old: two phase transactions not supported.")
        else:
            mtp = int(cur.fetchone()[0])
        cnn.close()

        if not mtp:
            return self.skipTest(
                "server not configured for two phase transactions. "
                "set max_prepared_transactions to > 0 to run the test")
        return f(self)

    return skip_if_tpc_disabled_
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def clear_test_xacts(self):
        """Rollback all the prepared transaction in the testing db."""
        cnn = self.connect()
        cnn.set_isolation_level(0)
        cur = cnn.cursor()
        try:
            cur.execute(
                "select gid from pg_prepared_xacts where database = %s",
                (dbname,))
        except psycopg2.ProgrammingError:
            cnn.rollback()
            cnn.close()
            return

        gids = [r[0] for r in cur]
        for gid in gids:
            cur.execute("rollback prepared %s;", (gid,))
        cnn.close()
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_copy_from_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)])
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_copy_to_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_types_extras.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _create_type(self, name, fields):
        curs = self.conn.cursor()
        try:
            curs.execute("drop type %s cascade;" % name)
        except psycopg2.ProgrammingError:
            self.conn.rollback()

        curs.execute("create type %s as (%s);" % (name,
            ", ".join(["%s %s" % p for p in fields])))
        if '.' in name:
            schema, name = name.split('.')
        else:
            schema = 'public'

        curs.execute("""\
            SELECT t.oid
            FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
            WHERE typname = %s and nspname = %s;
            """, (name, schema))
        oid = curs.fetchone()[0]
        self.conn.commit()
        return oid
test_cursor.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_withhold(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          withhold=True)

        self._create_withhold_table()
        curs = self.conn.cursor("W")
        self.assertEqual(curs.withhold, False)
        curs.withhold = True
        self.assertEqual(curs.withhold, True)
        curs.execute("select data from withhold order by data")
        self.conn.commit()
        self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
        curs.close()

        curs = self.conn.cursor("W", withhold=True)
        self.assertEqual(curs.withhold, True)
        curs.execute("select data from withhold order by data")
        self.conn.commit()
        self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])

        curs = self.conn.cursor()
        curs.execute("drop table withhold")
        self.conn.commit()
test_cursor.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [(i,) for i in range(100)])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_async.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_async_iter(self):
        cur = self.conn.cursor()

        cur.execute("begin")
        self.wait(cur)
        cur.execute("""
            insert into table1 values (1);
            insert into table1 values (2);
            insert into table1 values (3);
        """)
        self.wait(cur)
        cur.execute("select id from table1 order by id")

        # iteration fails if a query is underway
        self.assertRaises(psycopg2.ProgrammingError, list, cur)

        # but after it's done it should work
        self.wait(cur)
        self.assertEqual(list(cur), [(1, ), (2, ), (3, )])
        self.assertFalse(self.conn.isexecuting())
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def skip_if_tpc_disabled(f):
    """Skip a test if the server has tpc support disabled."""
    @wraps(f)
    def skip_if_tpc_disabled_(self):
        from psycopg2 import ProgrammingError
        cnn = self.connect()
        cur = cnn.cursor()
        try:
            cur.execute("SHOW max_prepared_transactions;")
        except ProgrammingError:
            return self.skipTest(
                "server too old: two phase transactions not supported.")
        else:
            mtp = int(cur.fetchone()[0])
        cnn.close()

        if not mtp:
            return self.skipTest(
                "server not configured for two phase transactions. "
                "set max_prepared_transactions to > 0 to run the test")
        return f(self)

    return skip_if_tpc_disabled_
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def clear_test_xacts(self):
        """Rollback all the prepared transaction in the testing db."""
        cnn = self.connect()
        cnn.set_isolation_level(0)
        cur = cnn.cursor()
        try:
            cur.execute(
                "select gid from pg_prepared_xacts where database = %s",
                (dbname,))
        except psycopg2.ProgrammingError:
            cnn.rollback()
            cnn.close()
            return

        gids = [r[0] for r in cur]
        for gid in gids:
            cur.execute("rollback prepared %s;", (gid,))
        cnn.close()
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_copy_to_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_cursor.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_withhold(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          withhold=True)

        self._create_withhold_table()
        curs = self.conn.cursor("W")
        self.assertEqual(curs.withhold, False)
        curs.withhold = True
        self.assertEqual(curs.withhold, True)
        curs.execute("select data from withhold order by data")
        self.conn.commit()
        self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
        curs.close()

        curs = self.conn.cursor("W", withhold=True)
        self.assertEqual(curs.withhold, True)
        curs.execute("select data from withhold order by data")
        self.conn.commit()
        self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])

        curs = self.conn.cursor()
        curs.execute("drop table withhold")
        self.conn.commit()
test_cursor.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [(i,) for i in range(100)])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_async.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def test_set_parameters_while_async(self):
        cur = self.conn.cursor()

        cur.execute("select 'c'")
        self.assertTrue(self.conn.isexecuting())

        # getting transaction status works
        self.assertEquals(self.conn.get_transaction_status(),
                          ext.TRANSACTION_STATUS_ACTIVE)
        self.assertTrue(self.conn.isexecuting())

        # setting connection encoding should fail
        self.assertRaises(psycopg2.ProgrammingError,
                          self.conn.set_client_encoding, "LATIN1")

        # same for transaction isolation
        self.assertRaises(psycopg2.ProgrammingError,
                          self.conn.set_isolation_level, 1)
test_async.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_async_iter(self):
        cur = self.conn.cursor()

        cur.execute("begin")
        self.wait(cur)
        cur.execute("""
            insert into table1 values (1);
            insert into table1 values (2);
            insert into table1 values (3);
        """)
        self.wait(cur)
        cur.execute("select id from table1 order by id")

        # iteration fails if a query is underway
        self.assertRaises(psycopg2.ProgrammingError, list, cur)

        # but after it's done it should work
        self.wait(cur)
        self.assertEquals(list(cur), [(1, ), (2, ), (3, )])
        self.assertFalse(self.conn.isexecuting())
signals.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def register_hstore_handler(connection, **kwargs):
    if connection.vendor != 'postgresql':
        return

    try:
        if six.PY2:
            register_hstore(connection.connection, globally=True, unicode=True)
        else:
            register_hstore(connection.connection, globally=True)
    except ProgrammingError:
        # Hstore is not available on the database.
        #
        # If someone tries to create an hstore field it will error there.
        # This is necessary as someone may be using PSQL without extensions
        # installed but be using other features of contrib.postgres.
        #
        # This is also needed in order to create the connection in order to
        # install the hstore extension.
        pass
signals.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def register_hstore_handler(connection, **kwargs):
    if connection.vendor != 'postgresql':
        return

    try:
        if six.PY2:
            register_hstore(connection.connection, globally=True, unicode=True)
        else:
            register_hstore(connection.connection, globally=True)
    except ProgrammingError:
        # Hstore is not available on the database.
        #
        # If someone tries to create an hstore field it will error there.
        # This is necessary as someone may be using PSQL without extensions
        # installed but be using other features of contrib.postgres.
        #
        # This is also needed in order to create the connection in order to
        # install the hstore extension.
        pass
conftest.py 文件源码 项目:occrp-timeline-tool 作者: datamade 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def database(request):
    pg_host = DB_OPTS.get("host")
    pg_port = DB_OPTS.get("port")
    pg_user = DB_OPTS.get("user")
    pg_db = DB_OPTS.get("name", "tests")

    if 'test' not in pg_db:
        pg_db = '{}_test'.format(pg_db)

    # Create our Database.
    try:
        init_postgresql_database(pg_user, pg_host, pg_port, pg_db)
    except psycopg2.ProgrammingError as e:
        if 'permission denied' in str(e):
            pg_user = 'postgres'
            init_postgresql_database(pg_user, pg_host, pg_port, pg_db)
        else:
            raise e


    # Ensure our database gets deleted.
    @request.addfinalizer
    def drop_database():
        drop_postgresql_database(pg_user, pg_host, pg_port, pg_db, 9.6)
extensions.py 文件源码 项目:postpy 作者: portfoliome 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def check_extension(conn, extension: str) -> bool:
    """Check to see if an extension is installed."""

    query = 'SELECT installed_version FROM pg_available_extensions WHERE name=%s;'

    with conn.cursor() as cursor:
        cursor.execute(query, (extension,))
        result = cursor.fetchone()

    if result is None:
        raise psycopg2.ProgrammingError(
            'Extension is not available for installation.', extension
        )
    else:
        extension_version = result[0]

        return bool(extension_version)
signals.py 文件源码 项目:Gypsy 作者: benticarlos 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def register_hstore_handler(connection, **kwargs):
    if connection.vendor != 'postgresql':
        return

    try:
        if six.PY2:
            register_hstore(connection.connection, globally=True, unicode=True)
        else:
            register_hstore(connection.connection, globally=True)
    except ProgrammingError:
        # Hstore is not available on the database.
        #
        # If someone tries to create an hstore field it will error there.
        # This is necessary as someone may be using PSQL without extensions
        # installed but be using other features of contrib.postgres.
        #
        # This is also needed in order to create the connection in order to
        # install the hstore extension.
        pass
testutils.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def skip_if_tpc_disabled(f):
    """Skip a test if the server has tpc support disabled."""
    @wraps(f)
    def skip_if_tpc_disabled_(self):
        from psycopg2 import ProgrammingError
        cnn = self.connect()
        cur = cnn.cursor()
        try:
            cur.execute("SHOW max_prepared_transactions;")
        except ProgrammingError:
            return self.skipTest(
                "server too old: two phase transactions not supported.")
        else:
            mtp = int(cur.fetchone()[0])
        cnn.close()

        if not mtp:
            return self.skipTest(
                "server not configured for two phase transactions. "
                "set max_prepared_transactions to > 0 to run the test")
        return f(self)

    return skip_if_tpc_disabled_
test_connection.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def clear_test_xacts(self):
        """Rollback all the prepared transaction in the testing db."""
        cnn = self.connect()
        cnn.set_isolation_level(0)
        cur = cnn.cursor()
        try:
            cur.execute(
                "select gid from pg_prepared_xacts where database = %s",
                (dbname,))
        except psycopg2.ProgrammingError:
            cnn.rollback()
            cnn.close()
            return

        gids = [r[0] for r in cur]
        for gid in gids:
            cur.execute("rollback prepared %s;", (gid,))
        cnn.close()
test_copy.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_copy_from_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)])
        proc.communicate()
        self.assertEqual(0, proc.returncode)


问题


面经


文章

微信
公众号

扫码关注公众号