def test_update_offset_values(self):
topic_1 = uuidutils.generate_uuid()
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = uuidutils.generate_uuid()
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
my_batch_time = self.get_dummy_batch_time()
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
until_offset_2 = random.randint(0, sys.maxsize)
while until_offset_2 == until_offset_1:
until_offset_2 = random.randint(0, sys.maxsize)
from_offset_2 = random.randint(0, sys.maxsize)
while from_offset_2 == from_offset_1:
from_offset_2 = random.randint(0, sys.maxsize)
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_2,
until_offset=until_offset_2,
batch_time_info=my_batch_time)
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
app_name_1)
updated_offset_value = kafka_offset_specs.get(offset_key_1)
self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
self.assertEqual(until_offset_2,
updated_offset_value.get_until_offset())
test_mysql_kafka_offsets.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录