test_mysql_kafka_offsets.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:monasca-transform 作者: openstack 项目源码 文件源码
def test_update_offset_values(self):
        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)

        my_batch_time = self.get_dummy_batch_time()

        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)

        until_offset_2 = random.randint(0, sys.maxsize)
        while until_offset_2 == until_offset_1:
            until_offset_2 = random.randint(0, sys.maxsize)

        from_offset_2 = random.randint(0, sys.maxsize)
        while from_offset_2 == from_offset_1:
            from_offset_2 = random.randint(0, sys.maxsize)

        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_2,
                                    until_offset=until_offset_2,
                                    batch_time_info=my_batch_time)

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        updated_offset_value = kafka_offset_specs.get(offset_key_1)
        self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
        self.assertEqual(until_offset_2,
                         updated_offset_value.get_until_offset())
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号