python类OperationalError()的实例源码

test_connection.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_cleanup_on_badconn_close(self):
        # ticket #148
        conn = self.conn
        cur = conn.cursor()
        try:
            cur.execute("select pg_terminate_backend(pg_backend_pid())")
        except psycopg2.OperationalError, e:
            if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
                raise
        except psycopg2.DatabaseError, e:
            # curiously when disconnected in green mode we get a DatabaseError
            # without pgcode.
            if e.pgcode is not None:
                raise

        self.assertEqual(conn.closed, 2)
        conn.close()
        self.assertEqual(conn.closed, 1)
test_cursor.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [ (i,) for i in range(100) ])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_lobject.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def tearDown(self):
        if self.tmpdir:
            shutil.rmtree(self.tmpdir, ignore_errors=True)

        if self.conn.closed:
            return

        if self.lo_oid is not None:
            self.conn.rollback()
            try:
                lo = self.conn.lobject(self.lo_oid, "n")
            except psycopg2.OperationalError:
                pass
            else:
                lo.unlink()

        ConnectingTestCase.tearDown(self)
test_connection.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_cleanup_on_badconn_close(self):
        # ticket #148
        conn = self.conn
        cur = conn.cursor()
        try:
            cur.execute("select pg_terminate_backend(pg_backend_pid())")
        except psycopg2.OperationalError, e:
            if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
                raise
        except psycopg2.DatabaseError, e:
            # curiously when disconnected in green mode we get a DatabaseError
            # without pgcode.
            if e.pgcode is not None:
                raise

        self.assertEqual(conn.closed, 2)
        conn.close()
        self.assertEqual(conn.closed, 1)
test_cursor.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [ (i,) for i in range(100) ])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_connection.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_cleanup_on_badconn_close(self):
        # ticket #148
        conn = self.conn
        cur = conn.cursor()
        try:
            cur.execute("select pg_terminate_backend(pg_backend_pid())")
        except psycopg2.OperationalError, e:
            if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
                raise
        except psycopg2.DatabaseError, e:
            # curiously when disconnected in green mode we get a DatabaseError
            # without pgcode.
            if e.pgcode is not None:
                raise

        self.assertEqual(conn.closed, 2)
        conn.close()
        self.assertEqual(conn.closed, 1)
test_cursor.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [ (i,) for i in range(100) ])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_lobject.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def tearDown(self):
        if self.tmpdir:
            shutil.rmtree(self.tmpdir, ignore_errors=True)

        if self.conn.closed:
            return

        if self.lo_oid is not None:
            self.conn.rollback()
            try:
                lo = self.conn.lobject(self.lo_oid, "n")
            except psycopg2.OperationalError:
                pass
            else:
                lo.unlink()

        ConnectingTestCase.tearDown(self)
storage_resource_class.py 文件源码 项目:intel-manager-for-lustre 作者: intel-hpdd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def filter_class_ids():
    """Wrapper to avoid importing storage_plugin_manager at module scope (it
    requires DB to construct itself) so that this module can be imported
    for e.g. building docs without a database.

    Return a list of storage resource class IDs which are valid for display (i.e.
    those for which we have a plugin available in this process)
    """
    from psycopg2 import OperationalError
    from django.db.utils import DatabaseError
    try:
        from chroma_core.lib.storage_plugin.manager import storage_plugin_manager
        return storage_plugin_manager.resource_class_id_to_class.keys()
    except (OperationalError, DatabaseError):
        # OperationalError if the DB server can't be contacted
        # DatabaseError if the DB exists but isn't populated
        return []
testutils.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def repl_connect(self, **kwargs):
        """Return a connection set up for replication

        The connection is on "PSYCOPG2_TEST_REPL_DSN" unless overridden by
        a *dsn* kwarg.

        Should raise a skip test if not available, but guard for None on
        old Python versions.
        """
        if 'dsn' not in kwargs:
            kwargs['dsn'] = repl_dsn
        import psycopg2
        try:
            conn = self.connect(**kwargs)
        except psycopg2.OperationalError, e:
            return self.skipTest("replication db not configured: %s" % e)

        conn.autocommit = True
        return conn
test_cursor.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [(i,) for i in range(100)])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_lobject.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def tearDown(self):
        if self.tmpdir:
            shutil.rmtree(self.tmpdir, ignore_errors=True)

        if self.conn.closed:
            return

        if self.lo_oid is not None:
            self.conn.rollback()
            try:
                lo = self.conn.lobject(self.lo_oid, "n")
            except psycopg2.OperationalError:
                pass
            else:
                lo.unlink()

        ConnectingTestCase.tearDown(self)
test_connection.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_cleanup_on_badconn_close(self):
        # ticket #148
        conn = self.conn
        cur = conn.cursor()
        try:
            cur.execute("select pg_terminate_backend(pg_backend_pid())")
        except psycopg2.OperationalError, e:
            if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
                raise
        except psycopg2.DatabaseError, e:
            # curiously when disconnected in green mode we get a DatabaseError
            # without pgcode.
            if e.pgcode is not None:
                raise

        self.assertEqual(conn.closed, 2)
        conn.close()
        self.assertEqual(conn.closed, 1)
test_cursor.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [(i,) for i in range(100)])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
database.py 文件源码 项目:betting-crawler 作者: HaraldNordgren 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def __init__(self):

        self.log = open('.database_actions.log', 'a')

        self.database_name  = "odds_data"
        self.matches_table  = "matches"

        self.odds_cols      = ['1', 'X', '2']
        self.timestamp_col  = "timestamp"

        self.teams          = teams()

        try:
            self.connect()

        except psycopg2.OperationalError:
            self.create_database()
            self.connect()

        self.cursor = self.connection.cursor(cursor_factory = psycopg2.extras.DictCursor)
        self.create_table()
failover.py 文件源码 项目:NZ-ORCID-Hub 作者: Royal-Society-of-New-Zealand 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _connect(self, database, encoding=None, **kwargs):
        """Attempt to connect to the DB.

        In case it's not available, 'fail over' to the back-up stand-by DB and propagete
        it to the master DB.
        """
        try:
            return super()._connect(database, encoding=encoding, **kwargs)
        except OperationalError as ex:
            logging.info("Failing over to %s", self.failover_host)

            if "could not connect to server" in ex.args[0] or "could not translate host name" in ex.args[0]:
                kwargs["host"] = self.failover_host
                conn = super()._connect(database, encoding=encoding, **kwargs)
                with conn.cursor() as cr:
                    cr.execute("SELECT promote_standby();")
                self.connect_kwargs[
                    "host"], self.failover_host = self.failover_host, self.connect_kwargs["host"]
                return conn
            else:
                raise ex
test_connection.py 文件源码 项目:aws-lambda-redshift-copy 作者: christianhxc 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_cleanup_on_badconn_close(self):
        # ticket #148
        conn = self.conn
        cur = conn.cursor()
        try:
            cur.execute("select pg_terminate_backend(pg_backend_pid())")
        except psycopg2.OperationalError, e:
            if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
                raise
        except psycopg2.DatabaseError, e:
            # curiously when disconnected in green mode we get a DatabaseError
            # without pgcode.
            if e.pgcode is not None:
                raise

        self.assertEqual(conn.closed, 2)
        conn.close()
        self.assertEqual(conn.closed, 1)
test_cursor.py 文件源码 项目:aws-lambda-redshift-copy 作者: christianhxc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_not_scrollable(self):
        self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
                          scrollable=False)

        curs = self.conn.cursor()
        curs.execute("create table scrollable (data int)")
        curs.executemany("insert into scrollable values (%s)",
            [ (i,) for i in range(100) ])
        curs.close()

        curs = self.conn.cursor("S")    # default scrollability
        curs.execute("select * from scrollable")
        self.assertEqual(curs.scrollable, None)
        curs.scroll(2)
        try:
            curs.scroll(-1)
        except psycopg2.OperationalError:
            return self.skipTest("can't evaluate non-scrollable cursor")
        curs.close()

        curs = self.conn.cursor("S", scrollable=False)
        self.assertEqual(curs.scrollable, False)
        curs.execute("select * from scrollable")
        curs.scroll(2)
        self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
test_lobject.py 文件源码 项目:aws-lambda-redshift-copy 作者: christianhxc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def tearDown(self):
        if self.tmpdir:
            shutil.rmtree(self.tmpdir, ignore_errors=True)

        if self.conn.closed:
            return

        if self.lo_oid is not None:
            self.conn.rollback()
            try:
                lo = self.conn.lobject(self.lo_oid, "n")
            except psycopg2.OperationalError:
                pass
            else:
                lo.unlink()

        ConnectingTestCase.tearDown(self)
checkdb.py 文件源码 项目:docker-taiga 作者: douglasmiranda 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def postgres_connection(connection_string, retry_counter=1):
    try:
        connection = psycopg2.connect(connection_string)
    except psycopg2.OperationalError as e:
        if retry_counter > LIMIT_RETRIES:
            logging.error("CAN'T CONNECT TO POSTGRES")
            logging.error("Check your connection settings.")
            logging.error("Or increase (in docker-compose.yml):")
            logging.error(
                "TAIGA_DB_CHECK_SLEEP_INTERVAL / TAIGA_DB_CHECK_LIMIT_RETRIES."
            )
            logging.error("Exception messsage: {}".format(e))
            sys.exit(1)
        else:
            logging.warning("Can't connect to Postgres. Will try again...")
            time.sleep(SLEEP_INTERVAL)
            retry_counter += 1
            return postgres_connection(connection_string, retry_counter)
    return connection


问题


面经


文章

微信
公众号

扫码关注公众号