test_udts.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码
def test_raise_error_on_nonexisting_udts(self):
        """
        Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace
        """

        c = Cluster(protocol_version=PROTOCOL_VERSION)
        s = c.connect(self.keyspace_name, wait_for_all_pools=True)
        User = namedtuple('user', ('age', 'name'))

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("some_bad_keyspace", "user", User)

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("system", "user", User)

        with self.assertRaises(InvalidRequest):
            s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")

        c.shutdown()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号