test_udts.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码
def test_non_frozen_udts(self):
        """
        Test to ensure that non frozen udt's work with C* >3.6.

        @since 3.7.0
        @jira_ticket PYTHON-498
        @expected_result Non frozen UDT's are supported

        @test_category data_types, udt
        """
        self.session.execute("USE {0}".format(self.keyspace_name))
        self.session.execute("CREATE TYPE user (state text, has_corn boolean)")
        self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name))
        User = namedtuple('user', ('state', 'has_corn'))
        self.cluster.register_user_type(self.keyspace_name, "user", User)
        self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True)))
        self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name))
        result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name))
        self.assertFalse(result[0].b.has_corn)
        table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query()
        self.assertNotIn("<frozen>", table_sql)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号