python类connection()的实例源码

test_with.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subclass_rollback(self):
        rollbacks = []
        class MyConn(ext.connection):
            def rollback(self):
                rollbacks.append(None)
                super(MyConn, self).rollback()

        try:
            with self.connect(connection_factory=MyConn) as conn:
                curs = conn.cursor()
                curs.execute("insert into test_with values (11)")
                1/0
        except ZeroDivisionError:
            pass
        else:
            self.assert_("exception not raised")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(rollbacks)

        curs = conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [])
test_with.py 文件源码 项目:userbase-sns-lambda 作者: fartashh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_subclass_commit(self):
        commits = []
        class MyConn(ext.connection):
            def commit(self):
                commits.append(None)
                super(MyConn, self).commit()

        with self.connect(connection_factory=MyConn) as conn:
            curs = conn.cursor()
            curs.execute("insert into test_with values (10)")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(commits)

        curs = self.conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [(10,)])
test_with.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def test_subclass_commit(self):
        commits = []
        class MyConn(ext.connection):
            def commit(self):
                commits.append(None)
                super(MyConn, self).commit()

        with self.connect(connection_factory=MyConn) as conn:
            curs = conn.cursor()
            curs.execute("insert into test_with values (10)")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(commits)

        curs = self.conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [(10,)])
test_with.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subclass_rollback(self):
        rollbacks = []
        class MyConn(ext.connection):
            def rollback(self):
                rollbacks.append(None)
                super(MyConn, self).rollback()

        try:
            with self.connect(connection_factory=MyConn) as conn:
                curs = conn.cursor()
                curs.execute("insert into test_with values (11)")
                1/0
        except ZeroDivisionError:
            pass
        else:
            self.assert_("exception not raised")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(rollbacks)

        curs = conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [])
test_with.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_subclass_commit(self):
        commits = []

        class MyConn(ext.connection):
            def commit(self):
                commits.append(None)
                super(MyConn, self).commit()

        with self.connect(connection_factory=MyConn) as conn:
            curs = conn.cursor()
            curs.execute("insert into test_with values (10)")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(commits)

        curs = self.conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [(10,)])
test_async.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_connection_setup(self):
        cur = self.conn.cursor()
        sync_cur = self.sync_conn.cursor()
        del cur, sync_cur

        self.assert_(self.conn.async_)
        self.assert_(not self.sync_conn.async_)

        # the async connection should be autocommit
        self.assert_(self.conn.autocommit)
        self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)

        # check other properties to be found on the connection
        self.assert_(self.conn.server_version)
        self.assert_(self.conn.protocol_version in (2, 3))
        self.assert_(self.conn.encoding in ext.encodings)
test_async.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_set_parameters_while_async(self):
        cur = self.conn.cursor()

        cur.execute("select 'c'")
        self.assertTrue(self.conn.isexecuting())

        # getting transaction status works
        self.assertEquals(self.conn.get_transaction_status(),
                          ext.TRANSACTION_STATUS_ACTIVE)
        self.assertTrue(self.conn.isexecuting())

        # setting connection encoding should fail
        self.assertRaises(psycopg2.ProgrammingError,
                          self.conn.set_client_encoding, "LATIN1")

        # same for transaction isolation
        self.assertRaises(psycopg2.ProgrammingError,
                          self.conn.set_isolation_level, 1)
sql.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def as_string(self, context):
        # is it a connection or cursor?
        if isinstance(context, ext.connection):
            conn = context
        elif isinstance(context, ext.cursor):
            conn = context.connection
        else:
            raise TypeError("context must be a connection or a cursor")

        a = ext.adapt(self._wrapped)
        if hasattr(a, 'prepare'):
            a.prepare(conn)

        rv = a.getquoted()
        if sys.version_info[0] >= 3 and isinstance(rv, bytes):
            rv = rv.decode(ext.encodings[conn.encoding])

        return rv
test_with.py 文件源码 项目:aws-lambda-redshift-copy 作者: christianhxc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_subclass_commit(self):
        commits = []
        class MyConn(ext.connection):
            def commit(self):
                commits.append(None)
                super(MyConn, self).commit()

        with self.connect(connection_factory=MyConn) as conn:
            curs = conn.cursor()
            curs.execute("insert into test_with values (10)")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(commits)

        curs = self.conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [(10,)])
test_with.py 文件源码 项目:aws-lambda-redshift-copy 作者: christianhxc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_subclass_rollback(self):
        rollbacks = []
        class MyConn(ext.connection):
            def rollback(self):
                rollbacks.append(None)
                super(MyConn, self).rollback()

        try:
            with self.connect(connection_factory=MyConn) as conn:
                curs = conn.cursor()
                curs.execute("insert into test_with values (11)")
                1/0
        except ZeroDivisionError:
            pass
        else:
            self.assert_("exception not raised")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(rollbacks)

        curs = conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [])
test_with.py 文件源码 项目:PyEloqua-Examples 作者: colemanja91 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subclass_commit(self):
        commits = []
        class MyConn(ext.connection):
            def commit(self):
                commits.append(None)
                super(MyConn, self).commit()

        with self.connect(connection_factory=MyConn) as conn:
            curs = conn.cursor()
            curs.execute("insert into test_with values (10)")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assertTrue(commits)

        curs = self.conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [(10,)])
test_with.py 文件源码 项目:PyEloqua-Examples 作者: colemanja91 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subclass_rollback(self):
        rollbacks = []
        class MyConn(ext.connection):
            def rollback(self):
                rollbacks.append(None)
                super(MyConn, self).rollback()

        try:
            with self.connect(connection_factory=MyConn) as conn:
                curs = conn.cursor()
                curs.execute("insert into test_with values (11)")
                1/0
        except ZeroDivisionError:
            pass
        else:
            self.assertTrue("exception not raised")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assertTrue(rollbacks)

        curs = conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [])
test_with.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_subclass_rollback(self):
        rollbacks = []

        class MyConn(ext.connection):
            def rollback(self):
                rollbacks.append(None)
                super(MyConn, self).rollback()

        try:
            with self.connect(connection_factory=MyConn) as conn:
                curs = conn.cursor()
                curs.execute("insert into test_with values (11)")
                1 / 0
        except ZeroDivisionError:
            pass
        else:
            self.assert_("exception not raised")

        self.assertEqual(conn.status, ext.STATUS_READY)
        self.assert_(rollbacks)

        curs = conn.cursor()
        curs.execute("select * from test_with")
        self.assertEqual(curs.fetchall(), [])
test_async.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_connection_setup(self):
        cur = self.conn.cursor()
        sync_cur = self.sync_conn.cursor()
        del cur, sync_cur

        self.assert_(self.conn.async_)
        self.assert_(not self.sync_conn.async_)

        # the async connection should be autocommit
        self.assert_(self.conn.autocommit)
        self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)

        # check other properties to be found on the connection
        self.assert_(self.conn.server_version)
        self.assert_(self.conn.protocol_version in (2, 3))
        self.assert_(self.conn.encoding in ext.encodings)
test_async.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_set_parameters_while_async(self):
        cur = self.conn.cursor()

        cur.execute("select 'c'")
        self.assertTrue(self.conn.isexecuting())

        # getting transaction status works
        self.assertEquals(self.conn.get_transaction_status(),
                          ext.TRANSACTION_STATUS_ACTIVE)
        self.assertTrue(self.conn.isexecuting())

        # setting connection encoding should fail
        self.assertRaises(psycopg2.ProgrammingError,
                          self.conn.set_client_encoding, "LATIN1")

        # same for transaction isolation
        self.assertRaises(psycopg2.ProgrammingError,
                          self.conn.set_isolation_level, 1)
sql.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def as_string(self, context):
        # is it a connection or cursor?
        if isinstance(context, ext.connection):
            conn = context
        elif isinstance(context, ext.cursor):
            conn = context.connection
        else:
            raise TypeError("context must be a connection or a cursor")

        a = ext.adapt(self._wrapped)
        if hasattr(a, 'prepare'):
            a.prepare(conn)

        rv = a.getquoted()
        if sys.version_info[0] >= 3 and isinstance(rv, bytes):
            rv = rv.decode(ext.encodings[conn.encoding])

        return rv
postgresql.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_data(self, query, t):
        if callable(query):
            query = query()
        query = prettify_sql(query)
        query = FILTERED if query is None else query
        duration = float(round(t, 3))
        connection = get_safe_connection_string(self)
        return query, duration, connection
postgresql.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def execute(self, query, vars=None):
        timestamp = time.time()
        try:
            return super(TaliskerCursor, self).execute(query, vars)
        finally:
            duration = (time.time() - timestamp) * 1000
            if vars is None:
                query = None
            self.connection._record('query', query, duration)
postgresql.py 文件源码 项目:talisker 作者: canonical-ols 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def callproc(self, procname, vars=None):
        timestamp = time.time()
        try:
            return super(TaliskerCursor, self).callproc(procname, vars)
        finally:
            duration = (time.time() - timestamp) * 1000
            # no query parameters, cannot safely record
            self.connection._record(
                'stored proc: {}'.format(procname), None, duration)
test_connection.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_cursor_closed_attribute(self):
        conn = self.conn
        curs = conn.cursor()
        self.assertEqual(curs.closed, False)
        curs.close()
        self.assertEqual(curs.closed, True)

        # Closing the connection closes the cursor:
        curs = conn.cursor()
        conn.close()
        self.assertEqual(curs.closed, True)


问题


面经


文章

微信
公众号

扫码关注公众号