python类ProgrammingError()的实例源码

test_copy.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_copy_to_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_types_extras.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _create_type(self, name, fields):
        curs = self.conn.cursor()
        try:
            curs.execute("drop type %s cascade;" % name)
        except psycopg2.ProgrammingError:
            self.conn.rollback()

        curs.execute("create type %s as (%s);" % (name,
            ", ".join(["%s %s" % p for p in fields])))
        if '.' in name:
            schema, name = name.split('.')
        else:
            schema = 'public'

        curs.execute("""\
            SELECT t.oid
            FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
            WHERE typname = %s and nspname = %s;
            """, (name, schema))
        oid = curs.fetchone()[0]
        self.conn.commit()
        return oid
test_cursor.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_withhold(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          withhold=True)

        self._create_withhold_table()
        curs = self.conn.cursor("W")
        self.assertEqual(curs.withhold, False)
        curs.withhold = True
        self.assertEqual(curs.withhold, True)
        curs.execute("select data from withhold order by data")
        self.conn.commit()
        self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
        curs.close()

        curs = self.conn.cursor("W", withhold=True)
        self.assertEqual(curs.withhold, True)
        curs.execute("select data from withhold order by data")
        self.conn.commit()
        self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])

        curs = self.conn.cursor()
        curs.execute("drop table withhold")
        self.conn.commit()
test_cursor.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [(i,) for i in range(100)])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_async.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_async_iter(self):
        cur = self.conn.cursor()

        cur.execute("begin")
        self.wait(cur)
        cur.execute("""
            insert into table1 values (1);
            insert into table1 values (2);
            insert into table1 values (3);
        """)
        self.wait(cur)
        cur.execute("select id from table1 order by id")

        # iteration fails if a query is underway
        self.assertRaises(psycopg2.ProgrammingError, list, cur)

        # but after it's done it should work
        self.wait(cur)
        self.assertEqual(list(cur), [(1, ), (2, ), (3, )])
        self.assertFalse(self.conn.isexecuting())
adv_test_base_db_ext.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_base_db_ext_valid_and_wrong_inserts(self):
        await self.init_db()
        with pytest.raises(psycopg2.ProgrammingError):
            await self.app.db.execute(db_name='test_db', query="""
                    INSERT INTO test_table(name) VALUES ('djaio_boo');
                    INSERT INTO test_table(name1) VALUES ('djaio');
            """, values=[], _type='select')
        bad_val = await self.app.db.execute('test_db', "select * from test_table  where name='djaio_boo';", [], 'select')
        assert bad_val == []
        assert True
import_genbank.py 文件源码 项目:db-import 作者: antismash 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def get_or_create_genome(rec, cur):
    '''Fetch existing genome entry or create a new one'''
    try:
        taxid = get_or_create_tax_id(cur, get_taxid(rec), get_strain(rec))
    except psycopg2.ProgrammingError:
        print(rec)
        raise
    cur.execute("SELECT genome_id FROM antismash.genomes WHERE tax_id = %s", (taxid,))
    ret = cur.fetchone()
    if ret is None:
        cur.execute("INSERT INTO antismash.genomes (tax_id) VALUES (%s) RETURNING genome_id;", (taxid,))
        ret = cur.fetchone()

    return ret[0]
app.py 文件源码 项目:ravel 作者: ravel-net 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self, db):
        """Load the application from the specified database
           db: a ravel.db.RavelDb instance into which the application will be loaded"""
        if self.sqlfile is None:
            logger.debug("loaded application %s with no SQL file", self.name)
            return

        with open(self.sqlfile) as f:
            try:
                db.cursor.execute(f.read())
            except psycopg2.ProgrammingError, e:
                print "Error loading app {0}: {1}".format(self.name, e)

        logger.debug("loaded application %s", self.name)
test_slot.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def test_create_slot(slot):

    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=psycopg2.errorcodes.DUPLICATE_OBJECT):
        pe = psycopg2.ProgrammingError()


        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
        slot.create_slot()
        slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
                                                                     slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                                     output_plugin=u'test_decoding')
    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                          new_callable=PropertyMock,
                          return_value=-1):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)

        with pytest.raises(psycopg2.ProgrammingError) as e_info:
            slot.create_slot()
            slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
                                                                         slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                                         output_plugin=u'test_decoding')
        assert e_info.value.pgcode == -1

        slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
    with pytest.raises(Exception):
        slot.create_slot()
        slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
                                                                         slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                                         output_plugin=u'test_decoding')
slot.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def create_slot(self):
        logger.info('Creating slot %s' % self.slot_name)
        try:
            self._repl_cursor.create_replication_slot(self.slot_name,
                                                      slot_type=psycopg2.extras.REPLICATION_LOGICAL,
                                                      output_plugin='test_decoding')
        except psycopg2.ProgrammingError as p:
            # Will be raised if slot exists already.
            if p.pgcode != psycopg2.errorcodes.DUPLICATE_OBJECT:
                logger.error(p)
                raise
            else:
                logger.info('Slot %s is already present.' % self.slot_name)
slot.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def delete_slot(self):
        logger.info('Deleting slot %s' % self.slot_name)
        try:
            self._repl_cursor.drop_replication_slot(self.slot_name)
        except psycopg2.ProgrammingError as p:
            # Will be raised if slot exists already.
            if p.pgcode != psycopg2.errorcodes.UNDEFINED_OBJECT:
                logger.error(p)
                raise
            else:
                logger.info('Slot %s was not found.' % self.slot_name)
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def skip_if_no_superuser(f):
    """Skip a test if the database user running the test is not a superuser"""
    @wraps(f)
    def skip_if_no_superuser_(self):
        from psycopg2 import ProgrammingError
        try:
            return f(self)
        except ProgrammingError as e:
            import psycopg2.errorcodes
            if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE:
                self.skipTest("skipped because not superuser")
            else:
                raise

    return skip_if_no_superuser_
test_types_basic.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_no_mro_no_joy(self):
        from psycopg2.extensions import adapt, register_adapter, AsIs

        class A:
            pass

        class B(A):
            pass

        register_adapter(A, lambda a: AsIs("a"))
        try:
            self.assertRaises(psycopg2.ProgrammingError, adapt, B())
        finally:
            del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
test_replication.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_replication_events(self):
        conn = self.connect()
        if conn is None:
            return
        cur = conn.cursor()

        try:
            cur.execute("DROP TABLE dummy1")
        except psycopg2.ProgrammingError:
            conn.rollback()
        cur.execute(
            "CREATE TABLE dummy1 AS SELECT * FROM generate_series(1, 5) AS id")
        conn.commit()
test_replication.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_create_replication_slot(self):
        conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
        if conn is None:
            return
        cur = conn.cursor()

        self.create_replication_slot(cur)
        self.assertRaises(
            psycopg2.ProgrammingError, self.create_replication_slot, cur)
test_replication.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_async_replication(self):
        conn = self.repl_connect(
            connection_factory=LogicalReplicationConnection, async_=1)
        if conn is None:
            return

        cur = conn.cursor()

        self.create_replication_slot(cur, output_plugin='test_decoding')
        self.wait(cur)

        cur.start_replication(self.slot)
        self.wait(cur)

        self.make_replication_events()

        self.msg_count = 0

        def consume(msg):
            # just check the methods
            "%s: %s" % (cur.io_timestamp, repr(msg))

            self.msg_count += 1
            if self.msg_count > 3:
                cur.send_feedback(reply=True)
                raise StopReplication()

            cur.send_feedback(flush_lsn=msg.data_start)

        # cannot be used in asynchronous mode
        self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)

        def process_stream():
            from select import select
            while True:
                msg = cur.read_message()
                if msg:
                    consume(msg)
                else:
                    select([cur], [], [])
        self.assertRaises(StopReplication, process_stream)
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_parse_dsn(self):
        from psycopg2 import ProgrammingError

        self.assertEqual(
            ext.parse_dsn('dbname=test user=tester password=secret'),
            dict(user='tester', password='secret', dbname='test'),
            "simple DSN parsed")

        self.assertRaises(ProgrammingError, ext.parse_dsn,
                          "dbname=test 2 user=tester password=secret")

        self.assertEqual(
            ext.parse_dsn("dbname='test 2' user=tester password=secret"),
            dict(user='tester', password='secret', dbname='test 2'),
            "DSN with quoting parsed")

        # Can't really use assertRaisesRegexp() here since we need to
        # make sure that secret is *not* exposed in the error messgage
        # (and it also requires python >= 2.7).
        raised = False
        try:
            # unterminated quote after dbname:
            ext.parse_dsn("dbname='test 2 user=tester password=secret")
        except ProgrammingError as e:
            raised = True
            self.assertTrue(str(e).find('secret') < 0,
                            "DSN was not exposed in error message")
        except e:
            self.fail("unexpected error condition: " + repr(e))
        self.assertTrue(raised, "ProgrammingError raised due to invalid DSN")


问题


面经


文章

微信
公众号

扫码关注公众号