python类connect()的实例源码

create_db.py 文件源码 项目:postpy 作者: portfoliome 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    db_name = os.environ['PGDATABASE']
    connection_parameters = {
        'host': os.environ['PGHOST'],
        'database': 'postgres',
        'user': os.environ['PGUSER'],
        'password': os.environ['PGPASSWORD']
    }
    drop_statement = 'DROP DATABASE IF EXISTS {};'.format(db_name)
    ddl_statement = 'CREATE DATABASE {};'.format(db_name)
    conn = connect(**connection_parameters)
    conn.autocommit = True

    try:
        with conn.cursor() as cursor:
            cursor.execute(drop_statement)
            cursor.execute(ddl_statement)
        conn.close()
        sys.stdout.write('Created database environment successfully.\n')
    except psycopg2.Error:
        raise SystemExit(
            'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info())
        )
db.py 文件源码 项目:ravel 作者: ravel-net 项目源码 文件源码 阅读 101 收藏 0 点赞 0 评论 0
def create(self):
        """If not created, create a database with the name specified in
           the constructor"""
        conn = None
        try:
            conn = psycopg2.connect(database="postgres",
                                    user=self.user,
                                    password=self.passwd)
            conn.set_isolation_level(ISOLEVEL)
            cursor = conn.cursor()
            cursor.execute("SELECT datname FROM pg_database WHERE " +
                           "datistemplate = false;")
            fetch = cursor.fetchall()

            dblist = [fetch[i][0] for i in range(len(fetch))]
            if self.name not in dblist:
                cursor.execute("CREATE DATABASE %s;" % self.name)
                logger.debug("created databse %s", self.name)
        except psycopg2.DatabaseError, e:
            logger.warning("error creating database: %s", self.fmt_errmsg(e))
        finally:
            conn.close()
create_pg_schema.py 文件源码 项目:zappa-django-utils 作者: Miserlou 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        self.stdout.write(self.style.SUCCESS('Starting Schema creation..'))

        dbname = settings.DATABASES['default']['NAME']
        user = settings.DATABASES['default']['USER']
        password = settings.DATABASES['default']['PASSWORD']
        host = settings.DATABASES['default']['HOST']

        con = connect(dbname=dbname, user=user, host=host, password=password)

        self.stdout.write(self.style.SUCCESS('Adding schema {schema} to database {dbname}'
                                             .format(schema=settings.SCHEMA, dbname=dbname)))
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        cur = con.cursor()
        cur.execute('CREATE SCHEMA {schema};'.format(schema=settings.SCHEMA))
        cur.close()

        con.close()

        self.stdout.write(self.style.SUCCESS('All Done!'))
drop_pg_schema.py 文件源码 项目:zappa-django-utils 作者: Miserlou 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        self.stdout.write(self.style.SUCCESS('Starting Schema deletion..'))

        dbname = settings.DATABASES['default']['NAME']
        user = settings.DATABASES['default']['USER']
        password = settings.DATABASES['default']['PASSWORD']
        host = settings.DATABASES['default']['HOST']

        con = connect(dbname=dbname, user=user, host=host, password=password)

        self.stdout.write(self.style.SUCCESS('Removing schema {schema} from database {dbname}'
                                             .format(schema=settings.SCHEMA, dbname=dbname)))
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        cur = con.cursor()
        cur.execute('DROP SCHEMA {schema} CASCADE;'.format(schema=settings.SCHEMA))
        cur.close()

        con.close()

        self.stdout.write(self.style.SUCCESS('All Done.'))
create_pg_db.py 文件源码 项目:zappa-django-utils 作者: Miserlou 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        self.stdout.write(self.style.SUCCESS('Starting DB creation..'))

        dbname = settings.DATABASES['default']['NAME']
        user = settings.DATABASES['default']['USER']
        password = settings.DATABASES['default']['PASSWORD']
        host = settings.DATABASES['default']['HOST']

        self.stdout.write(self.style.SUCCESS('Connecting to host..'))
        con = connect(dbname='postgres', user=user, host=host, password=password)
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        self.stdout.write(self.style.SUCCESS('Creating database'))
        cur = con.cursor()
        cur.execute('CREATE DATABASE ' + dbname)
        cur.close()

        con.close()

        self.stdout.write(self.style.SUCCESS('All done!'))
importolddudel.py 文件源码 项目:Bitpoll 作者: fsinfuhh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def resolve_user_or_group(self, old_id):
        """Resolve a user by its user id from old dudel."""
        # connect to db
        conn = psycopg2.connect(self.conn_string)
        cursor = conn.cursor()

        cursor.execute('SELECT username FROM "user" WHERE id=%s', (old_id,))
        username = cursor.fetchone()
        try:
            if username:
                return get_user_model().objects.get(username=username[0])
            else:
                cursor.execute('SELECT name FROM "group" WHERE id=%s', (old_id,))
                groupname = cursor.fetchone()
                if groupname:
                    return Group.objects.get(name=groupname[0])
        except ObjectDoesNotExist:
            return None
db.py 文件源码 项目:ravel 作者: ravel-net 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, name, user, base, passwd=None, reconnect=False):
        """name: the name of the database to connect to
           user: the username to use to connect
           base: a file containing the SQL implementation for Ravel's base
           passwd: the password to connect to the database
           reconnect: true to connect to an existing database setup, false
           to load a new instance of Ravel's base into the database"""
        self.name = name
        self.user = user
        self.passwd = passwd
        self.base = base
        self.cleaned = not reconnect
        self._cursor = None
        self._conn = None

        if not reconnect and self.num_connections() > 0:
            logger.warning("existing connections to database, skipping reinit")
            self.cleaned = False
        elif not reconnect:
            self.init()
            self.cleaned = True
db.py 文件源码 项目:ravel 作者: ravel-net 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def clean(self):
        """Clean the database of any existing Ravel components"""
        # close existing connections
        self.conn.close()

        conn = None
        try:
            conn = psycopg2.connect(database="postgres",
                                    user=self.user,
                                    password=self.passwd)
            conn.set_isolation_level(ISOLEVEL)
            cursor = conn.cursor()
            cursor.execute("drop database %s" % self.name)
        except psycopg2.DatabaseError, e:
            logger.warning("error cleaning database: %s", self.fmt_errmsg(e))
        finally:
            if conn:
                conn.close()
dbase.py 文件源码 项目:corporadb 作者: nlesc-sherlock 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def connectToDB(dbName=None, userName=None, dbPassword=None, dbHost=None,
                dbPort=None, dbCursor=psycopg2.extras.DictCursor):
    '''
    Connect to a specified PostgreSQL DB and return connection and cursor objects.
    '''
    # Start DB connection
    try:
        connectionString = "dbname='" + dbName + "'"
        if userName != None and userName != '':
            connectionString += " user='" + userName + "'"
        if dbHost != None and dbHost != '':
            connectionString += " host='" + dbHost + "'"
        if dbPassword != None and dbPassword != '':
            connectionString += " password='" + dbPassword + "'"
        if dbPort != None:
            connectionString += " port='" + str(dbPort) + "'"
        connection  = psycopg2.connect(connectionString)
        register_adapter(numpy.float64, addapt_numpy_float64)
        register_adapter(numpy.int64, addapt_numpy_int64)
    except:
        raise
    # if the connection succeeded get a cursor
    cursor = connection.cursor(cursor_factory=dbCursor)
    return connection, cursor
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def connect(self, **kwargs):
        try:
            self._conns
        except AttributeError as e:
            raise AttributeError(
                "%s (did you forget to call ConnectingTestCase.setUp()?)"
                % e)

        if 'dsn' in kwargs:
            conninfo = kwargs.pop('dsn')
        else:
            conninfo = dsn
        import psycopg2
        conn = psycopg2.connect(conninfo, **kwargs)
        self._conns.append(conn)
        return conn
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def skip_if_tpc_disabled(f):
    """Skip a test if the server has tpc support disabled."""
    @wraps(f)
    def skip_if_tpc_disabled_(self):
        from psycopg2 import ProgrammingError
        cnn = self.connect()
        cur = cnn.cursor()
        try:
            cur.execute("SHOW max_prepared_transactions;")
        except ProgrammingError:
            return self.skipTest(
                "server too old: two phase transactions not supported.")
        else:
            mtp = int(cur.fetchone()[0])
        cnn.close()

        if not mtp:
            return self.skipTest(
                "server not configured for two phase transactions. "
                "set max_prepared_transactions to > 0 to run the test")
        return f(self)

    return skip_if_tpc_disabled_
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_concurrent_execution(self):
        def slave():
            cnn = self.connect()
            cur = cnn.cursor()
            cur.execute("select pg_sleep(4)")
            cur.close()
            cnn.close()

        t1 = threading.Thread(target=slave)
        t2 = threading.Thread(target=slave)
        t0 = time.time()
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        self.assertTrue(time.time() - t0 < 7,
            "something broken in concurrency")
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_set_isolation_level_default(self):
        conn = self.connect()
        curs = conn.cursor()

        conn.autocommit = True
        curs.execute("set default_transaction_isolation to 'read committed'")

        conn.autocommit = False
        conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
        self.assertEqual(conn.isolation_level,
            ext.ISOLATION_LEVEL_SERIALIZABLE)
        curs.execute("show transaction_isolation")
        self.assertEqual(curs.fetchone()[0], "serializable")

        conn.rollback()
        conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
        curs.execute("show transaction_isolation")
        self.assertEqual(curs.fetchone()[0], "read committed")
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_isolation_level_read_committed(self):
        cnn1 = self.connect()
        cnn2 = self.connect()
        cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)

        cur1 = cnn1.cursor()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur1.fetchone()[0])
        cnn1.commit()

        cur2 = cnn2.cursor()
        cur2.execute("insert into isolevel values (10);")
        cur1.execute("insert into isolevel values (20);")

        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur2.fetchone()[0])
        cnn1.commit()
        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur2.fetchone()[0])

        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur1.fetchone()[0])
        cnn2.commit()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur1.fetchone()[0])
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def clear_test_xacts(self):
        """Rollback all the prepared transaction in the testing db."""
        cnn = self.connect()
        cnn.set_isolation_level(0)
        cur = cnn.cursor()
        try:
            cur.execute(
                "select gid from pg_prepared_xacts where database = %s",
                (dbname,))
        except psycopg2.ProgrammingError:
            cnn.rollback()
            cnn.close()
            return

        gids = [r[0] for r in cur]
        for gid in gids:
            cur.execute("rollback prepared %s;", (gid,))
        cnn.close()
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_tpc_commit(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        self.assertEqual(cnn.status, ext.STATUS_PREPARED)
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_commit()
        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(1, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_tpc_commit_one_phase(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit_1p');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_commit()
        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(1, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_tpc_commit_recovered(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        cnn.close()
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        cnn.tpc_commit(xid)

        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(1, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_tpc_rollback(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_rollback');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        self.assertEqual(cnn.status, ext.STATUS_PREPARED)
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_rollback()
        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_tpc_rollback_recovered(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        cnn.close()
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        cnn.tpc_rollback(xid)

        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_xid_roundtrip(self):
        for fid, gtrid, bqual in [
            (0, "", ""),
            (42, "gtrid", "bqual"),
            (0x7fffffff, "x" * 64, "y" * 64),
        ]:
            cnn = self.connect()
            xid = cnn.xid(fid, gtrid, bqual)
            cnn.tpc_begin(xid)
            cnn.tpc_prepare()
            cnn.close()

            cnn = self.connect()
            xids = [x for x in cnn.tpc_recover() if x.database == dbname]
            self.assertEqual(1, len(xids))
            xid = xids[0]
            self.assertEqual(xid.format_id, fid)
            self.assertEqual(xid.gtrid, gtrid)
            self.assertEqual(xid.bqual, bqual)

            cnn.tpc_rollback(xid)
test_copy.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_copy_from_segfault(self):
        # issue #219
        script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
    curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
    pass
conn.close()
""" % {'dsn': dsn})

        proc = Popen([sys.executable, '-c', script_to_py3(script)])
        proc.communicate()
        self.assertEqual(0, proc.returncode)
test_notify.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def notify(self, name, sec=0, payload=None):
        """Send a notification to the database, eventually after some time."""
        if payload is None:
            payload = ''
        else:
            payload = ", %r" % payload

        script = ("""\
import time
time.sleep(%(sec)s)
import %(module)s as psycopg2
import %(module)s.extensions as ext
conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
print conn.get_backend_pid()
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
""" % {
            'module': psycopg2.__name__,
            'dsn': dsn, 'sec': sec, 'name': name, 'payload': payload})

        return Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def connect(self, **kwargs):
        try:
            self._conns
        except AttributeError, e:
            raise AttributeError(
                "%s (did you forget to call ConnectingTestCase.setUp()?)"
                % e)

        if 'dsn' in kwargs:
            conninfo = kwargs.pop('dsn')
        else:
            conninfo = dsn
        import psycopg2
        conn = psycopg2.connect(conninfo, **kwargs)
        self._conns.append(conn)
        return conn
testutils.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def skip_if_tpc_disabled(f):
    """Skip a test if the server has tpc support disabled."""
    @wraps(f)
    def skip_if_tpc_disabled_(self):
        from psycopg2 import ProgrammingError
        cnn = self.connect()
        cur = cnn.cursor()
        try:
            cur.execute("SHOW max_prepared_transactions;")
        except ProgrammingError:
            return self.skipTest(
                "server too old: two phase transactions not supported.")
        else:
            mtp = int(cur.fetchone()[0])
        cnn.close()

        if not mtp:
            return self.skipTest(
                "server not configured for two phase transactions. "
                "set max_prepared_transactions to > 0 to run the test")
        return f(self)

    return skip_if_tpc_disabled_
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_concurrent_execution(self):
        def slave():
            cnn = self.connect()
            cur = cnn.cursor()
            cur.execute("select pg_sleep(4)")
            cur.close()
            cnn.close()

        t1 = threading.Thread(target=slave)
        t2 = threading.Thread(target=slave)
        t0 = time.time()
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        self.assert_(time.time() - t0 < 7,
            "something broken in concurrency")
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 71 收藏 0 点赞 0 评论 0
def test_set_isolation_level_default(self):
        conn = self.connect()
        curs = conn.cursor()

        conn.autocommit = True
        curs.execute("set default_transaction_isolation to 'read committed'")

        conn.autocommit = False
        conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
        self.assertEqual(conn.isolation_level,
            ext.ISOLATION_LEVEL_SERIALIZABLE)
        curs.execute("show transaction_isolation")
        self.assertEqual(curs.fetchone()[0], "serializable")

        conn.rollback()
        conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
        curs.execute("show transaction_isolation")
        self.assertEqual(curs.fetchone()[0], "read committed")
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_isolation_level_read_committed(self):
        cnn1 = self.connect()
        cnn2 = self.connect()
        cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)

        cur1 = cnn1.cursor()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(0, cur1.fetchone()[0])
        cnn1.commit()

        cur2 = cnn2.cursor()
        cur2.execute("insert into isolevel values (10);")
        cur1.execute("insert into isolevel values (20);")

        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur2.fetchone()[0])
        cnn1.commit()
        cur2.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur2.fetchone()[0])

        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(1, cur1.fetchone()[0])
        cnn2.commit()
        cur1.execute("select count(*) from isolevel;")
        self.assertEqual(2, cur1.fetchone()[0])
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def clear_test_xacts(self):
        """Rollback all the prepared transaction in the testing db."""
        cnn = self.connect()
        cnn.set_isolation_level(0)
        cur = cnn.cursor()
        try:
            cur.execute(
                "select gid from pg_prepared_xacts where database = %s",
                (dbname,))
        except psycopg2.ProgrammingError:
            cnn.rollback()
            cnn.close()
            return

        gids = [r[0] for r in cur]
        for gid in gids:
            cur.execute("rollback prepared %s;", (gid,))
        cnn.close()
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_tpc_commit(self):
        cnn = self.connect()
        xid = cnn.xid(1, "gtrid", "bqual")
        self.assertEqual(cnn.status, ext.STATUS_READY)

        cnn.tpc_begin(xid)
        self.assertEqual(cnn.status, ext.STATUS_BEGIN)

        cur = cnn.cursor()
        cur.execute("insert into test_tpc values ('test_tpc_commit');")
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_prepare()
        self.assertEqual(cnn.status, ext.STATUS_PREPARED)
        self.assertEqual(1, self.count_xacts())
        self.assertEqual(0, self.count_test_records())

        cnn.tpc_commit()
        self.assertEqual(cnn.status, ext.STATUS_READY)
        self.assertEqual(0, self.count_xacts())
        self.assertEqual(1, self.count_test_records())


问题


面经


文章

微信
公众号

扫码关注公众号